You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/22 00:59:40 UTC

spark git commit: [SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder

Repository: spark
Updated Branches:
  refs/heads/master ef6be7bed -> 1a95397bb


[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder

## What changes were proposed in this pull request?
This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff.

## How was this patch tested?
Updated test cases to reflect this.

Author: Reynold Xin <rx...@databricks.com>

Closes #12564 from rxin/SPARK-14798.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a95397b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a95397b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a95397b

Branch: refs/heads/master
Commit: 1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd
Parents: ef6be7b
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Apr 21 15:59:37 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 21 15:59:37 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/ScriptTransformation.scala    | 64 ++++++++++++-
 .../spark/sql/execution/SparkSqlParser.scala    | 95 +++++++++++++++++++-
 .../execution/command/HiveNativeCommand.scala   | 35 ++++++++
 .../spark/sql/internal/SessionState.scala       |  2 +-
 .../sql/execution/command/DDLCommandSuite.scala | 20 -----
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  3 +-
 .../spark/sql/hive/HiveQueryExecution.scala     |  4 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  5 +-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 10 +--
 .../sql/hive/execution/HiveNativeCommand.scala  | 36 --------
 .../sql/hive/execution/HiveSqlParser.scala      | 84 +----------------
 .../hive/execution/ScriptTransformation.scala   | 67 ++++----------
 .../apache/spark/sql/hive/test/TestHive.scala   |  3 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  3 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |  1 +
 .../sql/hive/execution/HiveComparisonTest.scala |  2 +-
 16 files changed, 222 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
index 578027d..e176e9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -37,7 +37,65 @@ case class ScriptTransformation(
 }
 
 /**
- * A placeholder for implementation specific input and output properties when passing data
- * to a script. For example, in Hive this would specify which SerDes to use.
+ * Input and output properties when passing data to a script.
+ * For example, in Hive this would specify which SerDes to use.
  */
-trait ScriptInputOutputSchema
+case class ScriptInputOutputSchema(
+    inputRowFormat: Seq[(String, String)],
+    outputRowFormat: Seq[(String, String)],
+    inputSerdeClass: Option[String],
+    outputSerdeClass: Option[String],
+    inputSerdeProps: Seq[(String, String)],
+    outputSerdeProps: Seq[(String, String)],
+    recordReaderClass: Option[String],
+    recordWriterClass: Option[String],
+    schemaLess: Boolean) {
+
+  def inputRowFormatSQL: Option[String] =
+    getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+  def outputRowFormatSQL: Option[String] =
+    getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+  /**
+   * Get the row format specification
+   * Note:
+   * 1. Changes are needed when readerClause and writerClause are supported.
+   * 2. Changes are needed when "ESCAPED BY" is supported.
+   */
+  private def getRowFormatSQL(
+    rowFormat: Seq[(String, String)],
+    serdeClass: Option[String],
+    serdeProps: Seq[(String, String)]): Option[String] = {
+    if (schemaLess) return Some("")
+
+    val rowFormatDelimited =
+      rowFormat.map {
+        case ("TOK_TABLEROWFORMATFIELD", value) =>
+          "FIELDS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+          "COLLECTION ITEMS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+          "MAP KEYS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATLINES", value) =>
+          "LINES TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATNULL", value) =>
+          "NULL DEFINED AS " + value
+        case o => return None
+      }
+
+    val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+    val serdePropsSQL =
+      if (serdeClass.nonEmpty) {
+        val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+        if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+      } else {
+        ""
+      }
+    if (rowFormat.nonEmpty) {
+      Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+    } else {
+      Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ac12a72..05fb1ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
 
@@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
+
 
 /**
  * Concrete parser for Spark SQL statements.
  */
-class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
   val astBuilder = new SparkSqlAstBuilder(conf)
+
+  private val substitutor = new VariableSubstitution(conf)
+
+  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+    super.parse(substitutor.substitute(command))(toResult)
+  }
+
+  protected override def nativeCommand(sqlText: String): LogicalPlan = {
+    HiveNativeCommand(substitutor.substitute(sqlText))
+  }
 }
 
 /**
@@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   import org.apache.spark.sql.catalyst.parser.ParserUtils._
 
   /**
+   * Pass a command to Hive using a [[HiveNativeCommand]].
+   */
+  override def visitExecuteNativeCommand(
+      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+    HiveNativeCommand(command(ctx))
+  }
+
+  /**
    * Create a [[SetCommand]] logical plan.
    *
    * Note that we assume that everything after the SET keyword is assumed to be a part of the
@@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         Option(col.STRING).map(string))
     }
   }
+
+  /**
+   * Create a [[ScriptInputOutputSchema]].
+   */
+  override protected def withScriptIOSchema(
+      ctx: QuerySpecificationContext,
+      inRowFormat: RowFormatContext,
+      recordWriter: Token,
+      outRowFormat: RowFormatContext,
+      recordReader: Token,
+      schemaLess: Boolean): ScriptInputOutputSchema = {
+    if (recordWriter != null || recordReader != null) {
+      throw new ParseException(
+        "Unsupported operation: Used defined record reader/writer classes.", ctx)
+    }
+
+    // Decode and input/output format.
+    type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+    def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
+      case c: RowFormatDelimitedContext =>
+        // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+        // expects a seq of pairs in which the old parsers' token names are used as keys.
+        // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+        // retrieving the key value pairs ourselves.
+        def entry(key: String, value: Token): Seq[(String, String)] = {
+          Option(value).map(t => key -> t.getText).toSeq
+        }
+        val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+          entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+          entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+        (entries, None, Seq.empty, None)
+
+      case c: RowFormatSerdeContext =>
+        // Use a serde format.
+        val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+        // SPARK-10310: Special cases LazySimpleSerDe
+        val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
+          Try(conf.getConfString(configKey)).toOption
+        } else {
+          None
+        }
+        (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+      case null =>
+        // Use default (serde) format.
+        val name = conf.getConfString("hive.script.serde",
+          "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+        val props = Seq("field.delim" -> "\t")
+        val recordHandler = Try(conf.getConfString(configKey)).toOption
+        (Nil, Option(name), props, recordHandler)
+    }
+
+    val (inFormat, inSerdeClass, inSerdeProps, reader) =
+      format(inRowFormat, "hive.script.recordreader")
+
+    val (outFormat, outSerdeClass, outSerdeProps, writer) =
+      format(outRowFormat, "hive.script.recordwriter")
+
+    ScriptInputOutputSchema(
+      inFormat, outFormat,
+      inSerdeClass, outSerdeClass,
+      inSerdeProps, outSerdeProps,
+      reader, writer,
+      schemaLess)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
new file mode 100644
index 0000000..39e441f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.types.StringType
+
+/**
+ * A command that we delegate to Hive. Eventually we should remove this.
+ */
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+  override def output: Seq[AttributeReference] =
+    Seq(AttributeReference("result", StringType, nullable = false)())
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sessionState.runNativeSql(sql).map(Row(_))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8563dc3..e1be4b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) {
   }
 
   def runNativeSql(sql: String): Seq[String] = {
-    throw new UnsupportedOperationException
+    throw new AnalysisException("Unsupported query: " + sql)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index e99eb02..a1ffda9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
-  test("unsupported operations") {
-    intercept[ParseException] {
-      parser.parsePlan("DROP TABLE tab PURGE")
-    }
-    intercept[ParseException] {
-      parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
-    }
-    intercept[ParseException] {
-      parser.parsePlan(
-        """
-          |CREATE EXTERNAL TABLE oneToTenDef
-          |USING org.apache.spark.sql.sources
-          |OPTIONS (from '1', to '10')
-        """.stripMargin)
-    }
-    intercept[ParseException] {
-      parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
-    }
-  }
-
   test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") {
     val sql = "SELECT distribute, unset FROM x"
     val parsed = parser.parsePlan(sql)

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8732285..ca39791 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 1c1bfb6..0ee34f0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bbdcc8c..8720e54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {
 
   object Scripts extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
-        ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
+      case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+        val hiveIoSchema = HiveScriptIOSchema(ioschema)
+        ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 2d44813..86115d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
 import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
 
 /**
@@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
   }
 
   private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
-    val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
-    val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+    val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
       throw new UnsupportedOperationException(
-        s"unsupported row format ${ioSchema.inputRowFormat}"))
-    val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+        s"unsupported row format ${plan.ioschema.inputRowFormat}"))
+    val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
       throw new UnsupportedOperationException(
-        s"unsupported row format ${ioSchema.outputRowFormat}"))
+        s"unsupported row format ${plan.ioschema.outputRowFormat}"))
 
     val outputSchema = plan.output.map { attr =>
       s"${attr.sql} ${attr.dataType.simpleString}"

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
deleted file mode 100644
index 8c1f4a8..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.HiveSessionState
-import org.apache.spark.sql.types.StringType
-
-private[hive]
-case class HiveNativeCommand(sql: String) extends RunnableCommand {
-
-  override def output: Seq[AttributeReference] =
-    Seq(AttributeReference("result", StringType, nullable = false)())
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 989da92..35530b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -17,16 +17,11 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.util.Try
-
-import org.antlr.v4.runtime.Token
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.HiveNativeCommand
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
 /**
@@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
   import ParserUtils._
 
   /**
-   * Pass a command to Hive using a [[HiveNativeCommand]].
-   */
-  override def visitExecuteNativeCommand(
-      ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
-    HiveNativeCommand(command(ctx))
-  }
-
-  /**
    * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
    * options are passed on to Hive) e.g.:
    * {{{
@@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
     }
   }
-
-  /**
-   * Create a [[HiveScriptIOSchema]].
-   */
-  override protected def withScriptIOSchema(
-      ctx: QuerySpecificationContext,
-      inRowFormat: RowFormatContext,
-      recordWriter: Token,
-      outRowFormat: RowFormatContext,
-      recordReader: Token,
-      schemaLess: Boolean): HiveScriptIOSchema = {
-    if (recordWriter != null || recordReader != null) {
-      throw new ParseException(
-        "Unsupported operation: Used defined record reader/writer classes.", ctx)
-    }
-
-    // Decode and input/output format.
-    type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
-    def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
-      case c: RowFormatDelimitedContext =>
-        // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
-        // expects a seq of pairs in which the old parsers' token names are used as keys.
-        // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
-        // retrieving the key value pairs ourselves.
-        def entry(key: String, value: Token): Seq[(String, String)] = {
-          Option(value).map(t => key -> t.getText).toSeq
-        }
-        val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
-          entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
-          entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
-
-        (entries, None, Seq.empty, None)
-
-      case c: RowFormatSerdeContext =>
-        // Use a serde format.
-        val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
-
-        // SPARK-10310: Special cases LazySimpleSerDe
-        val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
-          Try(conf.getConfString(configKey)).toOption
-        } else {
-          None
-        }
-        (Seq.empty, Option(name), props.toSeq, recordHandler)
-
-      case null =>
-        // Use default (serde) format.
-        val name = conf.getConfString("hive.script.serde",
-          "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
-        val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
-        val recordHandler = Try(conf.getConfString(configKey)).toOption
-        (Nil, Option(name), props, recordHandler)
-    }
-
-    val (inFormat, inSerdeClass, inSerdeProps, reader) =
-      format(inRowFormat, "hive.script.recordreader")
-
-    val (outFormat, outSerdeClass, outSerdeProps, writer) =
-      format(outRowFormat, "hive.script.recordwriter")
-
-    HiveScriptIOSchema(
-      inFormat, outFormat,
-      inSerdeClass, outSerdeClass,
-      inSerdeProps, outSerdeProps,
-      reader, writer,
-      schemaLess)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 2f7cec3..8c8becf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread(
   }
 }
 
+private[hive]
+object HiveScriptIOSchema {
+  def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = {
+    HiveScriptIOSchema(
+      input.inputRowFormat,
+      input.outputRowFormat,
+      input.inputSerdeClass,
+      input.outputSerdeClass,
+      input.inputSerdeProps,
+      input.outputSerdeProps,
+      input.recordReaderClass,
+      input.recordWriterClass,
+      input.schemaLess)
+  }
+}
+
 /**
  * The wrapper class of Hive input and output schema properties
  */
@@ -325,7 +341,8 @@ case class HiveScriptIOSchema (
     outputSerdeProps: Seq[(String, String)],
     recordReaderClass: Option[String],
     recordWriterClass: Option[String],
-    schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {
+    schemaLess: Boolean)
+  extends HiveInspectors {
 
   private val defaultFormat = Map(
     ("TOK_TABLEROWFORMATFIELD", "\t"),
@@ -402,52 +419,4 @@ case class HiveScriptIOSchema (
       instance
     }
   }
-
-  def inputRowFormatSQL: Option[String] =
-    getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
-
-  def outputRowFormatSQL: Option[String] =
-    getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
-
-  /**
-   * Get the row format specification
-   * Note:
-   * 1. Changes are needed when readerClause and writerClause are supported.
-   * 2. Changes are needed when "ESCAPED BY" is supported.
-   */
-  private def getRowFormatSQL(
-      rowFormat: Seq[(String, String)],
-      serdeClass: Option[String],
-      serdeProps: Seq[(String, String)]): Option[String] = {
-    if (schemaLess) return Some("")
-
-    val rowFormatDelimited =
-      rowFormat.map {
-        case ("TOK_TABLEROWFORMATFIELD", value) =>
-          "FIELDS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
-          "COLLECTION ITEMS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
-          "MAP KEYS TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATLINES", value) =>
-          "LINES TERMINATED BY " + value
-        case ("TOK_TABLEROWFORMATNULL", value) =>
-          "NULL DEFINED AS " + value
-        case o => return None
-      }
-
-    val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
-    val serdePropsSQL =
-      if (serdeClass.nonEmpty) {
-        val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
-        if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
-      } else {
-        ""
-      }
-    if (rowFormat.nonEmpty) {
-      Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
-    } else {
-      Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2bb1399..741e3bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 4c90dbe..e352256 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e918704..7a6f1ce 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.HiveNativeCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.test.TestHiveSingleton

http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 994dc4a..77906ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand}
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable}
 import org.apache.spark.sql.hive.SQLBuilder


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org