You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2014/09/30 00:52:04 UTC

git commit: [SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive

Repository: spark
Updated Branches:
  refs/heads/master e43c72fe0 -> 0bbe7faef


[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive

a new PR base on new master.  changes are the same as https://github.com/apache/spark/pull/1919

Author: baishuo(白硕) <vc...@hotmail.com>
Author: baishuo <vc...@hotmail.com>
Author: Cheng Lian <li...@gmail.com>

Closes #2226 from baishuo/patch-3007 and squashes the following commits:

e69ce88 [Cheng Lian] Adds tests to verify dynamic partitioning folder layout
b20a3dc [Cheng Lian] Addresses @yhuai's comments
096bbbc [baishuo(白硕)] Merge pull request #1 from liancheng/refactor-dp
1093c20 [Cheng Lian] Adds more tests
5004542 [Cheng Lian] Minor refactoring
fae9eff [Cheng Lian] Refactors InsertIntoHiveTable to a Command
528e84c [Cheng Lian] Fixes typo in test name, regenerated golden answer files
c464b26 [Cheng Lian] Refactors dynamic partitioning support
5033928 [baishuo] pass check style
2201c75 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name
b47c9bf [baishuo] modify according micheal's advice
c3ab36d [baishuo] modify for some bad indentation
7ce2d9f [baishuo] modify code to pass scala style checks
37c1c43 [baishuo] delete a empty else branch
66e33fc [baishuo] do a little modify
88d0110 [baishuo] update file after test
a3961d9 [baishuo(白硕)] Update Cast.scala
f7467d0 [baishuo(白硕)] Update InsertIntoHiveTable.scala
c1a59dd [baishuo(白硕)] Update Cast.scala
0e18496 [baishuo(白硕)] Update HiveQuerySuite.scala
60f70aa [baishuo(白硕)] Update InsertIntoHiveTable.scala
0a50db9 [baishuo(白硕)] Update HiveCompatibilitySuite.scala
491c7d0 [baishuo(白硕)] Update InsertIntoHiveTable.scala
a2374a8 [baishuo(白硕)] Update InsertIntoHiveTable.scala
701a814 [baishuo(白硕)] Update SparkHadoopWriter.scala
dc24c41 [baishuo(白硕)] Update HiveQl.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bbe7fae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bbe7fae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bbe7fae

Branch: refs/heads/master
Commit: 0bbe7faeffa17577ae8a33dfcd8c4c783db5c909
Parents: e43c72f
Author: baishuo(白硕) <vc...@hotmail.com>
Authored: Mon Sep 29 15:51:55 2014 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Sep 29 15:51:55 2014 -0700

----------------------------------------------------------------------
 .../hive/execution/HiveCompatibilitySuite.scala |  17 ++
 .../org/apache/spark/SparkHadoopWriter.scala    | 195 -----------------
 .../org/apache/spark/sql/hive/HiveQl.scala      |   5 -
 .../hive/execution/InsertIntoHiveTable.scala    | 207 ++++++++++--------
 .../spark/sql/hive/hiveWriterContainers.scala   | 217 +++++++++++++++++++
 ...partition-0-be33aaa7253c8f248ff3921cd7dae340 |   0
 ...partition-1-640552dd462707563fd255a713f83b41 |   0
 ...partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 |   1 +
 ...partition-3-b7f7fa7ebf666f4fee27e149d8c6961f |   0
 ...partition-4-8bdb71ad8cb3cc3026043def2525de3a |   0
 ...partition-5-c630dce438f3792e7fb0f523fbbb3e1e |   0
 ...partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf |   0
 ...partition-7-be33aaa7253c8f248ff3921cd7dae340 |   0
 .../sql/hive/execution/HiveQuerySuite.scala     | 100 ++++++++-
 14 files changed, 443 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 556c984..35e9c99 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -220,6 +220,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
    */
   override def whiteList = Seq(
     "add_part_exist",
+    "dynamic_partition_skip_default",
+    "infer_bucket_sort_dyn_part",
+    "load_dyn_part1",
+    "load_dyn_part2",
+    "load_dyn_part3",
+    "load_dyn_part4",
+    "load_dyn_part5",
+    "load_dyn_part6",
+    "load_dyn_part7",
+    "load_dyn_part8",
+    "load_dyn_part9",
+    "load_dyn_part10",
+    "load_dyn_part11",
+    "load_dyn_part12",
+    "load_dyn_part13",
+    "load_dyn_part14",
+    "load_dyn_part14_win",
     "add_part_multiple",
     "add_partition_no_whitelist",
     "add_partition_with_whitelist",

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
deleted file mode 100644
index ab7862f..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.util.Date
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.io.Writable
-
-import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on [[SparkHadoopWriter]].
- */
-private[hive] class SparkHiveHadoopWriter(
-    @transient jobConf: JobConf,
-    fileSinkConf: FileSinkDesc)
-  extends Logging
-  with SparkHadoopMapRedUtil
-  with Serializable {
-
-  private val now = new Date()
-  private val conf = new SerializableWritable(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
-  @transient private var committer: OutputCommitter = null
-  @transient private var jobContext: JobContext = null
-  @transient private var taskContext: TaskAttemptContext = null
-
-  def preSetup() {
-    setIDs(0, 0, 0)
-    setConfParams()
-
-    val jCtxt = getJobContext()
-    getOutputCommitter().setupJob(jCtxt)
-  }
-
-
-  def setup(jobid: Int, splitid: Int, attemptid: Int) {
-    setIDs(jobid, splitid, attemptid)
-    setConfParams()
-  }
-
-  def open() {
-    val numfmt = NumberFormat.getInstance()
-    numfmt.setMinimumIntegerDigits(5)
-    numfmt.setGroupingUsed(false)
-
-    val extension = Utilities.getFileExtension(
-      conf.value,
-      fileSinkConf.getCompressed,
-      getOutputFormat())
-
-    val outputName = "part-"  + numfmt.format(splitID) + extension
-    val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
-
-    getOutputCommitter().setupTask(getTaskContext())
-    writer = HiveFileFormatUtils.getHiveRecordWriter(
-      conf.value,
-      fileSinkConf.getTableInfo,
-      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-      fileSinkConf,
-      path,
-      null)
-  }
-
-  def write(value: Writable) {
-    if (writer != null) {
-      writer.write(value)
-    } else {
-      throw new IOException("Writer is null, open() has not been called")
-    }
-  }
-
-  def close() {
-    // Seems the boolean value passed into close does not matter.
-    writer.close(false)
-  }
-
-  def commit() {
-    val taCtxt = getTaskContext()
-    val cmtr = getOutputCommitter()
-    if (cmtr.needsTaskCommit(taCtxt)) {
-      try {
-        cmtr.commitTask(taCtxt)
-        logInfo (taID + ": Committed")
-      } catch {
-        case e: IOException =>
-          logError("Error committing the output of task: " + taID.value, e)
-          cmtr.abortTask(taCtxt)
-          throw e
-      }
-    } else {
-      logWarning ("No need to commit output of task: " + taID.value)
-    }
-  }
-
-  def commitJob() {
-    // always ? Or if cmtr.needsTaskCommit ?
-    val cmtr = getOutputCommitter()
-    cmtr.commitJob(getJobContext())
-  }
-
-  // ********* Private Functions *********
-
-  private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
-    if (format == null) {
-      format = conf.value.getOutputFormat()
-        .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
-    }
-    format
-  }
-
-  private def getOutputCommitter(): OutputCommitter = {
-    if (committer == null) {
-      committer = conf.value.getOutputCommitter
-    }
-    committer
-  }
-
-  private def getJobContext(): JobContext = {
-    if (jobContext == null) {
-      jobContext = newJobContext(conf.value, jID.value)
-    }
-    jobContext
-  }
-
-  private def getTaskContext(): TaskAttemptContext = {
-    if (taskContext == null) {
-      taskContext =  newTaskAttemptContext(conf.value, taID.value)
-    }
-    taskContext
-  }
-
-  private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
-    jobID = jobId
-    splitID = splitId
-    attemptID = attemptId
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
-    taID = new SerializableWritable[TaskAttemptID](
-      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
-  }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
-}
-
-private[hive] object SparkHiveHadoopWriter {
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0aa6292..4e30e6e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -837,11 +837,6 @@ private[hive] object HiveQl {
           cleanIdentifier(key.toLowerCase) -> None
       }.toMap).getOrElse(Map.empty)
 
-      if (partitionKeys.values.exists(p => p.isEmpty)) {
-        throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
-          s"dynamic partitioning.")
-      }
-
       InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
 
     case a: ASTNode =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index a284a91..3d2ee01 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,27 +19,25 @@ package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConversions._
 
-import java.util.{HashMap => JHashMap}
-
 import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.metadata.Hive
 import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
-import org.apache.hadoop.io.Writable
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
 
-import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
+import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
+import org.apache.spark.sql.hive._
+import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
 
 /**
  * :: DeveloperApi ::
@@ -51,7 +49,7 @@ case class InsertIntoHiveTable(
     child: SparkPlan,
     overwrite: Boolean)
     (@transient sc: HiveContext)
-  extends UnaryNode {
+  extends UnaryNode with Command {
 
   @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -101,66 +99,74 @@ case class InsertIntoHiveTable(
   }
 
   def saveAsHiveFile(
-      rdd: RDD[Writable],
+      rdd: RDD[Row],
       valueClass: Class[_],
       fileSinkConf: FileSinkDesc,
-      conf: JobConf,
-      isCompressed: Boolean) {
-    if (valueClass == null) {
-      throw new SparkException("Output value class not set")
-    }
-    conf.setOutputValueClass(valueClass)
-    if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
-      throw new SparkException("Output format class not set")
-    }
-    // Doesn't work in Scala 2.9 due to what may be a generics bug
-    // TODO: Should we uncomment this for Scala 2.10?
-    // conf.setOutputFormat(outputFormatClass)
-    conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
+      conf: SerializableWritable[JobConf],
+      writerContainer: SparkHiveWriterContainer) {
+    assert(valueClass != null, "Output value class not set")
+    conf.value.setOutputValueClass(valueClass)
+
+    val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
+    assert(outputFileFormatClassName != null, "Output format class not set")
+    conf.value.set("mapred.output.format.class", outputFileFormatClassName)
+
+    val isCompressed = conf.value.getBoolean(
+      ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
+
     if (isCompressed) {
       // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
       // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
       // to store compression information.
-      conf.set("mapred.output.compress", "true")
+      conf.value.set("mapred.output.compress", "true")
       fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
-      fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
+      fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
+      fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
     }
-    conf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(
-      conf,
-      SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
+    conf.value.setOutputCommitter(classOf[FileOutputCommitter])
 
+    FileOutputFormat.setOutputPath(
+      conf.value,
+      SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
     log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
 
-    val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
-    writer.preSetup()
+    writerContainer.driverSideSetup()
+    sc.sparkContext.runJob(rdd, writeToFile _)
+    writerContainer.commitJob()
+
+    // Note that this function is executed on executor side
+    def writeToFile(context: TaskContext, iterator: Iterator[Row]) {
+      val serializer = newSerializer(fileSinkConf.getTableInfo)
+      val standardOI = ObjectInspectorUtils
+        .getStandardObjectInspector(
+          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+          ObjectInspectorCopyOption.JAVA)
+        .asInstanceOf[StructObjectInspector]
+
+      val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+      val outputData = new Array[Any](fieldOIs.length)
 
-    def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+      writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
 
-      writer.setup(context.stageId, context.partitionId, attemptNumber)
-      writer.open()
+      iterator.foreach { row =>
+        var i = 0
+        while (i < fieldOIs.length) {
+          // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
+          outputData(i) = wrap(row(i), fieldOIs(i))
+          i += 1
+        }
 
-      var count = 0
-      while(iter.hasNext) {
-        val record = iter.next()
-        count += 1
-        writer.write(record)
+        val writer = writerContainer.getLocalFileWriter(row)
+        writer.write(serializer.serialize(outputData, standardOI))
       }
 
-      writer.close()
-      writer.commit()
+      writerContainer.close()
     }
-
-    sc.sparkContext.runJob(rdd, writeToFile _)
-    writer.commitJob()
   }
 
-  override def execute() = result
-
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -168,50 +174,57 @@ case class InsertIntoHiveTable(
    *
    * Note: this is run once and then kept to avoid double insertions.
    */
-  private lazy val result: RDD[Row] = {
-    val childRdd = child.execute()
-    assert(childRdd != null)
-
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
     val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
-    val rdd = childRdd.mapPartitions { iter =>
-      val serializer = newSerializer(fileSinkConf.getTableInfo)
-      val standardOI = ObjectInspectorUtils
-        .getStandardObjectInspector(
-          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
-          ObjectInspectorCopyOption.JAVA)
-        .asInstanceOf[StructObjectInspector]
 
+    val numDynamicPartitions = partition.values.count(_.isEmpty)
+    val numStaticPartitions = partition.values.count(_.nonEmpty)
+    val partitionSpec = partition.map {
+      case (key, Some(value)) => key -> value
+      case (key, None) => key -> ""
+    }
 
-      val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
-      val outputData = new Array[Any](fieldOIs.length)
-      iter.map { row =>
-        var i = 0
-        while (i < row.length) {
-          // Casts Strings to HiveVarchars when necessary.
-          outputData(i) = wrap(row(i), fieldOIs(i))
-          i += 1
-        }
+    // All partition column names in the format of "<column name 1>/<column name 2>/..."
+    val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
+    val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
 
-        serializer.serialize(outputData, standardOI)
+    // Validate partition spec if there exist any dynamic partitions
+    if (numDynamicPartitions > 0) {
+      // Report error if dynamic partitioning is not enabled
+      if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+      }
+
+      // Report error if dynamic partition strict mode is on but no static partition is found
+      if (numStaticPartitions == 0 &&
+        sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+      }
+
+      // Report error if any static partition appears after a dynamic partition
+      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+      isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ =>
+        throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
       }
     }
 
-    // ORC stores compression information in table properties. While, there are other formats
-    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
     val jobConf = new JobConf(sc.hiveconf)
-    saveAsHiveFile(
-      rdd,
-      outputClass,
-      fileSinkConf,
-      jobConf,
-      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
-
-    // TODO: Handle dynamic partitioning.
+    val jobConfSer = new SerializableWritable(jobConf)
+
+    val writerContainer = if (numDynamicPartitions > 0) {
+      val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
+      new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
+    } else {
+      new SparkHiveWriterContainer(jobConf, fileSinkConf)
+    }
+
+    saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
+
     val outputPath = FileOutputFormat.getOutputPath(jobConf)
     // Have to construct the format of dbname.tablename.
     val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
@@ -220,10 +233,6 @@ case class InsertIntoHiveTable(
     // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
     val holdDDLTime = false
     if (partition.nonEmpty) {
-      val partitionSpec = partition.map {
-        case (key, Some(value)) => key -> value
-        case (key, None) => key -> "" // Should not reach here right now.
-      }
       val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
       db.validatePartitionNameCharacters(partVals)
       // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -231,14 +240,26 @@ case class InsertIntoHiveTable(
       val inheritTableSpecs = true
       // TODO: Correctly set isSkewedStoreAsSubdir.
       val isSkewedStoreAsSubdir = false
-      db.loadPartition(
-        outputPath,
-        qualifiedTableName,
-        partitionSpec,
-        overwrite,
-        holdDDLTime,
-        inheritTableSpecs,
-        isSkewedStoreAsSubdir)
+      if (numDynamicPartitions > 0) {
+        db.loadDynamicPartitions(
+          outputPath,
+          qualifiedTableName,
+          partitionSpec,
+          overwrite,
+          numDynamicPartitions,
+          holdDDLTime,
+          isSkewedStoreAsSubdir
+        )
+      } else {
+        db.loadPartition(
+          outputPath,
+          qualifiedTableName,
+          partitionSpec,
+          overwrite,
+          holdDDLTime,
+          inheritTableSpecs,
+          isSkewedStoreAsSubdir)
+      }
     } else {
       db.loadTable(
         outputPath,
@@ -251,6 +272,6 @@ case class InsertIntoHiveTable(
     // however for now we return an empty list to simplify compatibility checks with hive, which
     // does not return anything for insert operations.
     // TODO: implement hive compatibility as rules.
-    sc.sparkContext.makeRDD(Nil, 1)
+    Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
new file mode 100644
index 0000000..a667188
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.IOException
+import java.text.NumberFormat
+import java.util.Date
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+
+/**
+ * Internal helper class that saves an RDD using a Hive OutputFormat.
+ * It is based on [[SparkHadoopWriter]].
+ */
+private[hive] class SparkHiveWriterContainer(
+    @transient jobConf: JobConf,
+    fileSinkConf: FileSinkDesc)
+  extends Logging
+  with SparkHadoopMapRedUtil
+  with Serializable {
+
+  private val now = new Date()
+  protected val conf = new SerializableWritable(jobConf)
+
+  private var jobID = 0
+  private var splitID = 0
+  private var attemptID = 0
+  private var jID: SerializableWritable[JobID] = null
+  private var taID: SerializableWritable[TaskAttemptID] = null
+
+  @transient private var writer: FileSinkOperator.RecordWriter = null
+  @transient private lazy val committer = conf.value.getOutputCommitter
+  @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
+  @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
+  @transient private lazy val outputFormat =
+    conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
+
+  def driverSideSetup() {
+    setIDs(0, 0, 0)
+    setConfParams()
+    committer.setupJob(jobContext)
+  }
+
+  def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
+    setIDs(jobId, splitId, attemptId)
+    setConfParams()
+    committer.setupTask(taskContext)
+    initWriters()
+  }
+
+  protected def getOutputName: String = {
+    val numberFormat = NumberFormat.getInstance()
+    numberFormat.setMinimumIntegerDigits(5)
+    numberFormat.setGroupingUsed(false)
+    val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
+    "part-" + numberFormat.format(splitID) + extension
+  }
+
+  def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
+
+  def close() {
+    // Seems the boolean value passed into close does not matter.
+    writer.close(false)
+    commit()
+  }
+
+  def commitJob() {
+    committer.commitJob(jobContext)
+  }
+
+  protected def initWriters() {
+    // NOTE this method is executed at the executor side.
+    // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
+    writer = HiveFileFormatUtils.getHiveRecordWriter(
+      conf.value,
+      fileSinkConf.getTableInfo,
+      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+      fileSinkConf,
+      FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
+      Reporter.NULL)
+  }
+
+  protected def commit() {
+    if (committer.needsTaskCommit(taskContext)) {
+      try {
+        committer.commitTask(taskContext)
+        logInfo (taID + ": Committed")
+      } catch {
+        case e: IOException =>
+          logError("Error committing the output of task: " + taID.value, e)
+          committer.abortTask(taskContext)
+          throw e
+      }
+    } else {
+      logInfo("No need to commit output of task: " + taID.value)
+    }
+  }
+
+  // ********* Private Functions *********
+
+  private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
+    jobID = jobId
+    splitID = splitId
+    attemptID = attemptId
+
+    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+    taID = new SerializableWritable[TaskAttemptID](
+      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+  }
+
+  private def setConfParams() {
+    conf.value.set("mapred.job.id", jID.value.toString)
+    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+    conf.value.set("mapred.task.id", taID.value.toString)
+    conf.value.setBoolean("mapred.task.is.map", true)
+    conf.value.setInt("mapred.task.partition", splitID)
+  }
+}
+
+private[hive] object SparkHiveWriterContainer {
+  def createPathFromString(path: String, conf: JobConf): Path = {
+    if (path == null) {
+      throw new IllegalArgumentException("Output path is null")
+    }
+    val outputPath = new Path(path)
+    val fs = outputPath.getFileSystem(conf)
+    if (outputPath == null || fs == null) {
+      throw new IllegalArgumentException("Incorrectly formatted output path")
+    }
+    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+  }
+}
+
+private[spark] class SparkHiveDynamicPartitionWriterContainer(
+    @transient jobConf: JobConf,
+    fileSinkConf: FileSinkDesc,
+    dynamicPartColNames: Array[String])
+  extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
+
+  private val defaultPartName = jobConf.get(
+    ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
+
+  @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
+
+  override protected def initWriters(): Unit = {
+    // NOTE: This method is executed at the executor side.
+    // Actual writers are created for each dynamic partition on the fly.
+    writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
+  }
+
+  override def close(): Unit = {
+    writers.values.foreach(_.close(false))
+    commit()
+  }
+
+  override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
+    val dynamicPartPath = dynamicPartColNames
+      .zip(row.takeRight(dynamicPartColNames.length))
+      .map { case (col, rawVal) =>
+        val string = String.valueOf(rawVal)
+        s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
+      }
+      .mkString
+
+    def newWriter = {
+      val newFileSinkDesc = new FileSinkDesc(
+        fileSinkConf.getDirName + dynamicPartPath,
+        fileSinkConf.getTableInfo,
+        fileSinkConf.getCompressed)
+      newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
+      newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
+
+      val path = {
+        val outputPath = FileOutputFormat.getOutputPath(conf.value)
+        assert(outputPath != null, "Undefined job output-path")
+        val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
+        new Path(workPath, getOutputName)
+      }
+
+      HiveFileFormatUtils.getHiveRecordWriter(
+        conf.value,
+        fileSinkConf.getTableInfo,
+        conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+        newFileSinkDesc,
+        path,
+        Reporter.NULL)
+    }
+
+    writers.getOrElseUpdate(dynamicPartPath, newWriter)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
@@ -0,0 +1 @@
+0

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/0bbe7fae/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2da8a6f..5d743a5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.hive.execution
 
 import scala.util.Try
 
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+
+import org.apache.spark.SparkException
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -380,7 +383,7 @@ class HiveQuerySuite extends HiveComparisonTest {
 
   def isExplanation(result: SchemaRDD) = {
     val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
-    explanation.exists(_ == "== Physical Plan ==")
+    explanation.contains("== Physical Plan ==")
   }
 
   test("SPARK-1704: Explain commands as a SchemaRDD") {
@@ -568,6 +571,91 @@ class HiveQuerySuite extends HiveComparisonTest {
   case class LogEntry(filename: String, message: String)
   case class LogFile(name: String)
 
+  createQueryTest("dynamic_partition",
+    """
+      |DROP TABLE IF EXISTS dynamic_part_table;
+      |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT);
+      |
+      |SET hive.exec.dynamic.partition.mode=nonstrict;
+      |
+      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+      |SELECT 1, 1, 1 FROM src WHERE key=150;
+      |
+      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+      |SELECT 1, NULL, 1 FROM src WHERE key=150;
+      |
+      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+      |SELECT 1, 1, NULL FROM src WHERE key=150;
+      |
+      |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2)
+      |SELECT 1, NULL, NULL FROM src WHERE key=150;
+      |
+      |DROP TABLE IF EXISTS dynamic_part_table;
+    """.stripMargin)
+
+  test("Dynamic partition folder layout") {
+    sql("DROP TABLE IF EXISTS dynamic_part_table")
+    sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)")
+    sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+    val data = Map(
+      Seq("1", "1") -> 1,
+      Seq("1", "NULL") -> 2,
+      Seq("NULL", "1") -> 3,
+      Seq("NULL", "NULL") -> 4)
+
+    data.foreach { case (parts, value) =>
+      sql(
+        s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+           |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150
+         """.stripMargin)
+
+      val partFolder = Seq("partcol1", "partcol2")
+        .zip(parts)
+        .map { case (k, v) =>
+          if (v == "NULL") {
+            s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}"
+          } else {
+            s"$k=$v"
+          }
+        }
+        .mkString("/")
+
+      // Loads partition data to a temporary table to verify contents
+      val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
+
+      sql("DROP TABLE IF EXISTS dp_verify")
+      sql("CREATE TABLE dp_verify(intcol INT)")
+      sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify")
+
+      assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value)))
+    }
+  }
+
+  test("Partition spec validation") {
+    sql("DROP TABLE IF EXISTS dp_test")
+    sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)")
+    sql("SET hive.exec.dynamic.partition.mode=strict")
+
+    // Should throw when using strict dynamic partition mode without any static partition
+    intercept[SparkException] {
+      sql(
+        """INSERT INTO TABLE dp_test PARTITION(dp)
+          |SELECT key, value, key % 5 FROM src
+        """.stripMargin)
+    }
+
+    sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+    // Should throw when a static partition appears after a dynamic partition
+    intercept[SparkException] {
+      sql(
+        """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
+          |SELECT key, value, key % 5 FROM src
+        """.stripMargin)
+    }
+  }
+
   test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
     sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
     sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
@@ -625,27 +713,27 @@ class HiveQuerySuite extends HiveComparisonTest {
     assert(sql("SET").collect().size == 0)
 
     assertResult(Set(testKey -> testVal)) {
-      collectResults(hql(s"SET $testKey=$testVal"))
+      collectResults(sql(s"SET $testKey=$testVal"))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
     assertResult(Set(testKey -> testVal)) {
-      collectResults(hql("SET"))
+      collectResults(sql("SET"))
     }
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
     assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
-      collectResults(hql("SET"))
+      collectResults(sql("SET"))
     }
 
     // "set key"
     assertResult(Set(testKey -> testVal)) {
-      collectResults(hql(s"SET $testKey"))
+      collectResults(sql(s"SET $testKey"))
     }
 
     assertResult(Set(nonexistentKey -> "<undefined>")) {
-      collectResults(hql(s"SET $nonexistentKey"))
+      collectResults(sql(s"SET $nonexistentKey"))
     }
 
     // Assert that sql() should have the same effects as sql() by repeating the above using sql().


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org