You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/30 18:43:58 UTC

git commit: Revert "[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive"

Repository: spark
Updated Branches:
  refs/heads/master b167a8c7e -> b64fcbd2d


Revert "[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive"

This reverts commit 0bbe7faeffa17577ae8a33dfcd8c4c783db5c909.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b64fcbd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b64fcbd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b64fcbd2

Branch: refs/heads/master
Commit: b64fcbd2dcec3418397328399c58f98d990a54f1
Parents: b167a8c
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Sep 30 09:43:46 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Sep 30 09:43:46 2014 -0700

----------------------------------------------------------------------
 .../hive/execution/HiveCompatibilitySuite.scala |  17 --
 .../org/apache/spark/SparkHadoopWriter.scala    | 195 +++++++++++++++++
 .../org/apache/spark/sql/hive/HiveQl.scala      |   5 +
 .../hive/execution/InsertIntoHiveTable.scala    | 207 ++++++++----------
 .../spark/sql/hive/hiveWriterContainers.scala   | 217 -------------------
 ...partition-0-be33aaa7253c8f248ff3921cd7dae340 |   0
 ...partition-1-640552dd462707563fd255a713f83b41 |   0
 ...partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 |   1 -
 ...partition-3-b7f7fa7ebf666f4fee27e149d8c6961f |   0
 ...partition-4-8bdb71ad8cb3cc3026043def2525de3a |   0
 ...partition-5-c630dce438f3792e7fb0f523fbbb3e1e |   0
 ...partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf |   0
 ...partition-7-be33aaa7253c8f248ff3921cd7dae340 |   0
 .../sql/hive/execution/HiveQuerySuite.scala     | 100 +--------
 14 files changed, 299 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 35e9c99..556c984 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -220,23 +220,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
    */
   override def whiteList = Seq(
     "add_part_exist",
-    "dynamic_partition_skip_default",
-    "infer_bucket_sort_dyn_part",
-    "load_dyn_part1",
-    "load_dyn_part2",
-    "load_dyn_part3",
-    "load_dyn_part4",
-    "load_dyn_part5",
-    "load_dyn_part6",
-    "load_dyn_part7",
-    "load_dyn_part8",
-    "load_dyn_part9",
-    "load_dyn_part10",
-    "load_dyn_part11",
-    "load_dyn_part12",
-    "load_dyn_part13",
-    "load_dyn_part14",
-    "load_dyn_part14_win",
     "add_part_multiple",
     "add_partition_no_whitelist",
     "add_partition_with_whitelist",

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
new file mode 100644
index 0000000..ab7862f
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.IOException
+import java.text.NumberFormat
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.io.Writable
+
+import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+
+/**
+ * Internal helper class that saves an RDD using a Hive OutputFormat.
+ * It is based on [[SparkHadoopWriter]].
+ */
+private[hive] class SparkHiveHadoopWriter(
+    @transient jobConf: JobConf,
+    fileSinkConf: FileSinkDesc)
+  extends Logging
+  with SparkHadoopMapRedUtil
+  with Serializable {
+
+  private val now = new Date()
+  private val conf = new SerializableWritable(jobConf)
+
+  private var jobID = 0
+  private var splitID = 0
+  private var attemptID = 0
+  private var jID: SerializableWritable[JobID] = null
+  private var taID: SerializableWritable[TaskAttemptID] = null
+
+  @transient private var writer: FileSinkOperator.RecordWriter = null
+  @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
+  @transient private var committer: OutputCommitter = null
+  @transient private var jobContext: JobContext = null
+  @transient private var taskContext: TaskAttemptContext = null
+
+  def preSetup() {
+    setIDs(0, 0, 0)
+    setConfParams()
+
+    val jCtxt = getJobContext()
+    getOutputCommitter().setupJob(jCtxt)
+  }
+
+
+  def setup(jobid: Int, splitid: Int, attemptid: Int) {
+    setIDs(jobid, splitid, attemptid)
+    setConfParams()
+  }
+
+  def open() {
+    val numfmt = NumberFormat.getInstance()
+    numfmt.setMinimumIntegerDigits(5)
+    numfmt.setGroupingUsed(false)
+
+    val extension = Utilities.getFileExtension(
+      conf.value,
+      fileSinkConf.getCompressed,
+      getOutputFormat())
+
+    val outputName = "part-"  + numfmt.format(splitID) + extension
+    val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
+
+    getOutputCommitter().setupTask(getTaskContext())
+    writer = HiveFileFormatUtils.getHiveRecordWriter(
+      conf.value,
+      fileSinkConf.getTableInfo,
+      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+      fileSinkConf,
+      path,
+      null)
+  }
+
+  def write(value: Writable) {
+    if (writer != null) {
+      writer.write(value)
+    } else {
+      throw new IOException("Writer is null, open() has not been called")
+    }
+  }
+
+  def close() {
+    // Seems the boolean value passed into close does not matter.
+    writer.close(false)
+  }
+
+  def commit() {
+    val taCtxt = getTaskContext()
+    val cmtr = getOutputCommitter()
+    if (cmtr.needsTaskCommit(taCtxt)) {
+      try {
+        cmtr.commitTask(taCtxt)
+        logInfo (taID + ": Committed")
+      } catch {
+        case e: IOException =>
+          logError("Error committing the output of task: " + taID.value, e)
+          cmtr.abortTask(taCtxt)
+          throw e
+      }
+    } else {
+      logWarning ("No need to commit output of task: " + taID.value)
+    }
+  }
+
+  def commitJob() {
+    // always ? Or if cmtr.needsTaskCommit ?
+    val cmtr = getOutputCommitter()
+    cmtr.commitJob(getJobContext())
+  }
+
+  // ********* Private Functions *********
+
+  private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
+    if (format == null) {
+      format = conf.value.getOutputFormat()
+        .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
+    }
+    format
+  }
+
+  private def getOutputCommitter(): OutputCommitter = {
+    if (committer == null) {
+      committer = conf.value.getOutputCommitter
+    }
+    committer
+  }
+
+  private def getJobContext(): JobContext = {
+    if (jobContext == null) {
+      jobContext = newJobContext(conf.value, jID.value)
+    }
+    jobContext
+  }
+
+  private def getTaskContext(): TaskAttemptContext = {
+    if (taskContext == null) {
+      taskContext =  newTaskAttemptContext(conf.value, taID.value)
+    }
+    taskContext
+  }
+
+  private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
+    jobID = jobId
+    splitID = splitId
+    attemptID = attemptId
+
+    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+    taID = new SerializableWritable[TaskAttemptID](
+      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+  }
+
+  private def setConfParams() {
+    conf.value.set("mapred.job.id", jID.value.toString)
+    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+    conf.value.set("mapred.task.id", taID.value.toString)
+    conf.value.setBoolean("mapred.task.is.map", true)
+    conf.value.setInt("mapred.task.partition", splitID)
+  }
+}
+
+private[hive] object SparkHiveHadoopWriter {
+  def createPathFromString(path: String, conf: JobConf): Path = {
+    if (path == null) {
+      throw new IllegalArgumentException("Output path is null")
+    }
+    val outputPath = new Path(path)
+    val fs = outputPath.getFileSystem(conf)
+    if (outputPath == null || fs == null) {
+      throw new IllegalArgumentException("Incorrectly formatted output path")
+    }
+    outputPath.makeQualified(fs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 4e30e6e..0aa6292 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -837,6 +837,11 @@ private[hive] object HiveQl {
           cleanIdentifier(key.toLowerCase) -> None
       }.toMap).getOrElse(Map.empty)
 
+      if (partitionKeys.values.exists(p => p.isEmpty)) {
+        throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
+          s"dynamic partitioning.")
+      }
+
       InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
 
     case a: ASTNode =>

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3d2ee01..a284a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,25 +19,27 @@ package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConversions._
 
+import java.util.{HashMap => JHashMap}
+
 import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.metadata.Hive
 import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
 
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive._
-import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
 
 /**
  * :: DeveloperApi ::
@@ -49,7 +51,7 @@ case class InsertIntoHiveTable(
     child: SparkPlan,
     overwrite: Boolean)
     (@transient sc: HiveContext)
-  extends UnaryNode with Command {
+  extends UnaryNode {
 
   @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -99,74 +101,66 @@ case class InsertIntoHiveTable(
   }
 
   def saveAsHiveFile(
-      rdd: RDD[Row],
+      rdd: RDD[Writable],
       valueClass: Class[_],
       fileSinkConf: FileSinkDesc,
-      conf: SerializableWritable[JobConf],
-      writerContainer: SparkHiveWriterContainer) {
-    assert(valueClass != null, "Output value class not set")
-    conf.value.setOutputValueClass(valueClass)
-
-    val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
-    assert(outputFileFormatClassName != null, "Output format class not set")
-    conf.value.set("mapred.output.format.class", outputFileFormatClassName)
-
-    val isCompressed = conf.value.getBoolean(
-      ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
-
+      conf: JobConf,
+      isCompressed: Boolean) {
+    if (valueClass == null) {
+      throw new SparkException("Output value class not set")
+    }
+    conf.setOutputValueClass(valueClass)
+    if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
+      throw new SparkException("Output format class not set")
+    }
+    // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // TODO: Should we uncomment this for Scala 2.10?
+    // conf.setOutputFormat(outputFormatClass)
+    conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
     if (isCompressed) {
       // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
       // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
       // to store compression information.
-      conf.value.set("mapred.output.compress", "true")
+      conf.set("mapred.output.compress", "true")
       fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
-      fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
+      fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
+      fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
     }
-    conf.value.setOutputCommitter(classOf[FileOutputCommitter])
-
+    conf.setOutputCommitter(classOf[FileOutputCommitter])
     FileOutputFormat.setOutputPath(
-      conf.value,
-      SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
-    log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
+      conf,
+      SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
 
-    writerContainer.driverSideSetup()
-    sc.sparkContext.runJob(rdd, writeToFile _)
-    writerContainer.commitJob()
-
-    // Note that this function is executed on executor side
-    def writeToFile(context: TaskContext, iterator: Iterator[Row]) {
-      val serializer = newSerializer(fileSinkConf.getTableInfo)
-      val standardOI = ObjectInspectorUtils
-        .getStandardObjectInspector(
-          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
-          ObjectInspectorCopyOption.JAVA)
-        .asInstanceOf[StructObjectInspector]
+    log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
 
-      val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
-      val outputData = new Array[Any](fieldOIs.length)
+    val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
+    writer.preSetup()
 
+    def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-      writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
 
-      iterator.foreach { row =>
-        var i = 0
-        while (i < fieldOIs.length) {
-          // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
-          outputData(i) = wrap(row(i), fieldOIs(i))
-          i += 1
-        }
+      writer.setup(context.stageId, context.partitionId, attemptNumber)
+      writer.open()
 
-        val writer = writerContainer.getLocalFileWriter(row)
-        writer.write(serializer.serialize(outputData, standardOI))
+      var count = 0
+      while(iter.hasNext) {
+        val record = iter.next()
+        count += 1
+        writer.write(record)
       }
 
-      writerContainer.close()
+      writer.close()
+      writer.commit()
     }
+
+    sc.sparkContext.runJob(rdd, writeToFile _)
+    writer.commitJob()
   }
 
+  override def execute() = result
+
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -174,57 +168,50 @@ case class InsertIntoHiveTable(
    *
    * Note: this is run once and then kept to avoid double insertions.
    */
-  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+  private lazy val result: RDD[Row] = {
+    val childRdd = child.execute()
+    assert(childRdd != null)
+
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
     val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+    val rdd = childRdd.mapPartitions { iter =>
+      val serializer = newSerializer(fileSinkConf.getTableInfo)
+      val standardOI = ObjectInspectorUtils
+        .getStandardObjectInspector(
+          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+          ObjectInspectorCopyOption.JAVA)
+        .asInstanceOf[StructObjectInspector]
 
-    val numDynamicPartitions = partition.values.count(_.isEmpty)
-    val numStaticPartitions = partition.values.count(_.nonEmpty)
-    val partitionSpec = partition.map {
-      case (key, Some(value)) => key -> value
-      case (key, None) => key -> ""
-    }
-
-    // All partition column names in the format of "<column name 1>/<column name 2>/..."
-    val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
-    val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
-
-    // Validate partition spec if there exist any dynamic partitions
-    if (numDynamicPartitions > 0) {
-      // Report error if dynamic partitioning is not enabled
-      if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
-        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
-      }
 
-      // Report error if dynamic partition strict mode is on but no static partition is found
-      if (numStaticPartitions == 0 &&
-        sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
-        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
-      }
+      val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+      val outputData = new Array[Any](fieldOIs.length)
+      iter.map { row =>
+        var i = 0
+        while (i < row.length) {
+          // Casts Strings to HiveVarchars when necessary.
+          outputData(i) = wrap(row(i), fieldOIs(i))
+          i += 1
+        }
 
-      // Report error if any static partition appears after a dynamic partition
-      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
-      isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ =>
-        throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+        serializer.serialize(outputData, standardOI)
       }
     }
 
+    // ORC stores compression information in table properties. While, there are other formats
+    // (e.g. RCFile) that rely on hadoop configurations to store compression information.
     val jobConf = new JobConf(sc.hiveconf)
-    val jobConfSer = new SerializableWritable(jobConf)
-
-    val writerContainer = if (numDynamicPartitions > 0) {
-      val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
-      new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
-    } else {
-      new SparkHiveWriterContainer(jobConf, fileSinkConf)
-    }
-
-    saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
-
+    saveAsHiveFile(
+      rdd,
+      outputClass,
+      fileSinkConf,
+      jobConf,
+      sc.hiveconf.getBoolean("hive.exec.compress.output", false))
+
+    // TODO: Handle dynamic partitioning.
     val outputPath = FileOutputFormat.getOutputPath(jobConf)
     // Have to construct the format of dbname.tablename.
     val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
@@ -233,6 +220,10 @@ case class InsertIntoHiveTable(
     // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
     val holdDDLTime = false
     if (partition.nonEmpty) {
+      val partitionSpec = partition.map {
+        case (key, Some(value)) => key -> value
+        case (key, None) => key -> "" // Should not reach here right now.
+      }
       val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
       db.validatePartitionNameCharacters(partVals)
       // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -240,26 +231,14 @@ case class InsertIntoHiveTable(
       val inheritTableSpecs = true
       // TODO: Correctly set isSkewedStoreAsSubdir.
       val isSkewedStoreAsSubdir = false
-      if (numDynamicPartitions > 0) {
-        db.loadDynamicPartitions(
-          outputPath,
-          qualifiedTableName,
-          partitionSpec,
-          overwrite,
-          numDynamicPartitions,
-          holdDDLTime,
-          isSkewedStoreAsSubdir
-        )
-      } else {
-        db.loadPartition(
-          outputPath,
-          qualifiedTableName,
-          partitionSpec,
-          overwrite,
-          holdDDLTime,
-          inheritTableSpecs,
-          isSkewedStoreAsSubdir)
-      }
+      db.loadPartition(
+        outputPath,
+        qualifiedTableName,
+        partitionSpec,
+        overwrite,
+        holdDDLTime,
+        inheritTableSpecs,
+        isSkewedStoreAsSubdir)
     } else {
       db.loadTable(
         outputPath,
@@ -272,6 +251,6 @@ case class InsertIntoHiveTable(
     // however for now we return an empty list to simplify compatibility checks with hive, which
     // does not return anything for insert operations.
     // TODO: implement hive compatibility as rules.
-    Seq.empty[Row]
+    sc.sparkContext.makeRDD(Nil, 1)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
deleted file mode 100644
index a667188..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.util.Date
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-
-import org.apache.spark.sql.Row
-import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on [[SparkHadoopWriter]].
- */
-private[hive] class SparkHiveWriterContainer(
-    @transient jobConf: JobConf,
-    fileSinkConf: FileSinkDesc)
-  extends Logging
-  with SparkHadoopMapRedUtil
-  with Serializable {
-
-  private val now = new Date()
-  protected val conf = new SerializableWritable(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient private lazy val committer = conf.value.getOutputCommitter
-  @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
-  @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
-  @transient private lazy val outputFormat =
-    conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
-
-  def driverSideSetup() {
-    setIDs(0, 0, 0)
-    setConfParams()
-    committer.setupJob(jobContext)
-  }
-
-  def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
-    setIDs(jobId, splitId, attemptId)
-    setConfParams()
-    committer.setupTask(taskContext)
-    initWriters()
-  }
-
-  protected def getOutputName: String = {
-    val numberFormat = NumberFormat.getInstance()
-    numberFormat.setMinimumIntegerDigits(5)
-    numberFormat.setGroupingUsed(false)
-    val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
-    "part-" + numberFormat.format(splitID) + extension
-  }
-
-  def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
-
-  def close() {
-    // Seems the boolean value passed into close does not matter.
-    writer.close(false)
-    commit()
-  }
-
-  def commitJob() {
-    committer.commitJob(jobContext)
-  }
-
-  protected def initWriters() {
-    // NOTE this method is executed at the executor side.
-    // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
-    writer = HiveFileFormatUtils.getHiveRecordWriter(
-      conf.value,
-      fileSinkConf.getTableInfo,
-      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-      fileSinkConf,
-      FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
-      Reporter.NULL)
-  }
-
-  protected def commit() {
-    if (committer.needsTaskCommit(taskContext)) {
-      try {
-        committer.commitTask(taskContext)
-        logInfo (taID + ": Committed")
-      } catch {
-        case e: IOException =>
-          logError("Error committing the output of task: " + taID.value, e)
-          committer.abortTask(taskContext)
-          throw e
-      }
-    } else {
-      logInfo("No need to commit output of task: " + taID.value)
-    }
-  }
-
-  // ********* Private Functions *********
-
-  private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
-    jobID = jobId
-    splitID = splitId
-    attemptID = attemptId
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
-    taID = new SerializableWritable[TaskAttemptID](
-      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
-  }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
-}
-
-private[hive] object SparkHiveWriterContainer {
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-  }
-}
-
-private[spark] class SparkHiveDynamicPartitionWriterContainer(
-    @transient jobConf: JobConf,
-    fileSinkConf: FileSinkDesc,
-    dynamicPartColNames: Array[String])
-  extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
-
-  private val defaultPartName = jobConf.get(
-    ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
-
-  @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
-
-  override protected def initWriters(): Unit = {
-    // NOTE: This method is executed at the executor side.
-    // Actual writers are created for each dynamic partition on the fly.
-    writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
-  }
-
-  override def close(): Unit = {
-    writers.values.foreach(_.close(false))
-    commit()
-  }
-
-  override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
-    val dynamicPartPath = dynamicPartColNames
-      .zip(row.takeRight(dynamicPartColNames.length))
-      .map { case (col, rawVal) =>
-        val string = String.valueOf(rawVal)
-        s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
-      }
-      .mkString
-
-    def newWriter = {
-      val newFileSinkDesc = new FileSinkDesc(
-        fileSinkConf.getDirName + dynamicPartPath,
-        fileSinkConf.getTableInfo,
-        fileSinkConf.getCompressed)
-      newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
-      newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
-
-      val path = {
-        val outputPath = FileOutputFormat.getOutputPath(conf.value)
-        assert(outputPath != null, "Undefined job output-path")
-        val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
-        new Path(workPath, getOutputName)
-      }
-
-      HiveFileFormatUtils.getHiveRecordWriter(
-        conf.value,
-        fileSinkConf.getTableInfo,
-        conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-        newFileSinkDesc,
-        path,
-        Reporter.NULL)
-    }
-
-    writers.getOrElseUpdate(dynamicPartPath, newWriter)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
deleted file mode 100644
index 573541a..0000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
+++ /dev/null
@@ -1 +0,0 @@
-0

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/spark/blob/b64fcbd2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 5d743a5..2da8a6f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.hive.execution
 
 import scala.util.Try
 
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-
-import org.apache.spark.SparkException
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -383,7 +380,7 @@ class HiveQuerySuite extends HiveComparisonTest {
 
   def isExplanation(result: SchemaRDD) = {
     val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
-    explanation.contains("== Physical Plan ==")
+    explanation.exists(_ == "== Physical Plan ==")
   }
 
   test("SPARK-1704: Explain commands as a SchemaRDD") {
@@ -571,91 +568,6 @@ class HiveQuerySuite extends HiveComparisonTest {
   case class LogEntry(filename: String, message: String)
   case class LogFile(name: String)
 
-  createQueryTest("dynamic_partition",
-    """
-      |DROP TABLE IF EXISTS dynamic_part_table;
-      |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT);
-      |
-      |SET hive.exec.dynamic.partition.mode=nonstrict;
-      |
-      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
-      |SELECT 1, 1, 1 FROM src WHERE key=150;
-      |
-      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
-      |SELECT 1, NULL, 1 FROM src WHERE key=150;
-      |
-      |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
-      |SELECT 1, 1, NULL FROM src WHERE key=150;
-      |
-      |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2)
-      |SELECT 1, NULL, NULL FROM src WHERE key=150;
-      |
-      |DROP TABLE IF EXISTS dynamic_part_table;
-    """.stripMargin)
-
-  test("Dynamic partition folder layout") {
-    sql("DROP TABLE IF EXISTS dynamic_part_table")
-    sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)")
-    sql("SET hive.exec.dynamic.partition.mode=nonstrict")
-
-    val data = Map(
-      Seq("1", "1") -> 1,
-      Seq("1", "NULL") -> 2,
-      Seq("NULL", "1") -> 3,
-      Seq("NULL", "NULL") -> 4)
-
-    data.foreach { case (parts, value) =>
-      sql(
-        s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
-           |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150
-         """.stripMargin)
-
-      val partFolder = Seq("partcol1", "partcol2")
-        .zip(parts)
-        .map { case (k, v) =>
-          if (v == "NULL") {
-            s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}"
-          } else {
-            s"$k=$v"
-          }
-        }
-        .mkString("/")
-
-      // Loads partition data to a temporary table to verify contents
-      val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
-
-      sql("DROP TABLE IF EXISTS dp_verify")
-      sql("CREATE TABLE dp_verify(intcol INT)")
-      sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify")
-
-      assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value)))
-    }
-  }
-
-  test("Partition spec validation") {
-    sql("DROP TABLE IF EXISTS dp_test")
-    sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)")
-    sql("SET hive.exec.dynamic.partition.mode=strict")
-
-    // Should throw when using strict dynamic partition mode without any static partition
-    intercept[SparkException] {
-      sql(
-        """INSERT INTO TABLE dp_test PARTITION(dp)
-          |SELECT key, value, key % 5 FROM src
-        """.stripMargin)
-    }
-
-    sql("SET hive.exec.dynamic.partition.mode=nonstrict")
-
-    // Should throw when a static partition appears after a dynamic partition
-    intercept[SparkException] {
-      sql(
-        """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
-          |SELECT key, value, key % 5 FROM src
-        """.stripMargin)
-    }
-  }
-
   test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
     sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
     sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
@@ -713,27 +625,27 @@ class HiveQuerySuite extends HiveComparisonTest {
     assert(sql("SET").collect().size == 0)
 
     assertResult(Set(testKey -> testVal)) {
-      collectResults(sql(s"SET $testKey=$testVal"))
+      collectResults(hql(s"SET $testKey=$testVal"))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
     assertResult(Set(testKey -> testVal)) {
-      collectResults(sql("SET"))
+      collectResults(hql("SET"))
     }
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
     assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
-      collectResults(sql("SET"))
+      collectResults(hql("SET"))
     }
 
     // "set key"
     assertResult(Set(testKey -> testVal)) {
-      collectResults(sql(s"SET $testKey"))
+      collectResults(hql(s"SET $testKey"))
     }
 
     assertResult(Set(nonexistentKey -> "<undefined>")) {
-      collectResults(sql(s"SET $nonexistentKey"))
+      collectResults(hql(s"SET $nonexistentKey"))
     }
 
     // Assert that sql() should have the same effects as sql() by repeating the above using sql().


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org