You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/21 02:11:54 UTC

spark git commit: [SPARK-8968][SQL] external sort by the partition clomns when dynamic partitioning to optimize the memory overhead

Repository: spark
Updated Branches:
  refs/heads/master b362239df -> 015c8efb3


[SPARK-8968][SQL] external sort by the partition clomns when dynamic partitioning to optimize the memory overhead

Now the hash based writer dynamic partitioning show the bad performance for big data and cause many small files and high GC. This patch we do external sort first so that each time we only need open one writer.

before this patch:
![gc](https://cloud.githubusercontent.com/assets/7018048/9149788/edc48c6e-3dec-11e5-828c-9995b56e4d65.PNG)

after this patch:
![gc-optimize-externalsort](https://cloud.githubusercontent.com/assets/7018048/9149794/60f80c9c-3ded-11e5-8a56-7ae18ddc7a2f.png)

Author: wangfei <wa...@126.com>
Author: scwf <wa...@huawei.com>

Closes #7336 from scwf/dynamic-optimize-basedon-apachespark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/015c8efb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/015c8efb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/015c8efb

Branch: refs/heads/master
Commit: 015c8efb3774c57be6f3fee5a454622879cab1ec
Parents: b362239
Author: wangfei <wa...@126.com>
Authored: Wed Jan 20 17:11:52 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 20 17:11:52 2016 -0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  69 ++-----
 .../spark/sql/hive/hiveWriterContainers.scala   | 196 ++++++++++++++-----
 2 files changed, 166 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/015c8efb/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b02ace7..feb133d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -24,20 +24,16 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
-import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.SparkException
 import org.apache.spark.util.SerializableJobConf
 
 private[hive]
@@ -46,19 +42,12 @@ case class InsertIntoHiveTable(
     partition: Map[String, Option[String]],
     child: SparkPlan,
     overwrite: Boolean,
-    ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
+    ifNotExists: Boolean) extends UnaryNode {
 
   @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
-  @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
   @transient private lazy val catalog = sc.catalog
 
-  private def newSerializer(tableDesc: TableDesc): Serializer = {
-    val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
-    serializer.initialize(null, tableDesc.getProperties)
-    serializer
-  }
-
   def output: Seq[Attribute] = Seq.empty
 
   private def saveAsHiveFile(
@@ -78,44 +67,10 @@ case class InsertIntoHiveTable(
       conf.value,
       SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
     log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
-
     writerContainer.driverSideSetup()
-    sc.sparkContext.runJob(rdd, writeToFile _)
+    sc.sparkContext.runJob(rdd, writerContainer.writeToFile _)
     writerContainer.commitJob()
 
-    // Note that this function is executed on executor side
-    def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-      val serializer = newSerializer(fileSinkConf.getTableInfo)
-      val standardOI = ObjectInspectorUtils
-        .getStandardObjectInspector(
-          fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
-          ObjectInspectorCopyOption.JAVA)
-        .asInstanceOf[StructObjectInspector]
-
-      val fieldOIs = standardOI.getAllStructFieldRefs.asScala
-        .map(_.getFieldObjectInspector).toArray
-      val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray
-      val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)}
-      val outputData = new Array[Any](fieldOIs.length)
-
-      writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
-
-      val proj = FromUnsafeProjection(child.schema)
-      iterator.foreach { row =>
-        var i = 0
-        val safeRow = proj(row)
-        while (i < fieldOIs.length) {
-          outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
-          i += 1
-        }
-
-        writerContainer
-          .getLocalFileWriter(safeRow, table.schema)
-          .write(serializer.serialize(outputData, standardOI))
-      }
-
-      writerContainer.close()
-    }
   }
 
   /**
@@ -194,11 +149,21 @@ case class InsertIntoHiveTable(
 
     val writerContainer = if (numDynamicPartitions > 0) {
       val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
-      new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
+      new SparkHiveDynamicPartitionWriterContainer(
+        jobConf,
+        fileSinkConf,
+        dynamicPartColNames,
+        child.output,
+        table)
     } else {
-      new SparkHiveWriterContainer(jobConf, fileSinkConf)
+      new SparkHiveWriterContainer(
+        jobConf,
+        fileSinkConf,
+        child.output,
+        table)
     }
 
+    @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
     saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
 
     val outputPath = FileOutputFormat.getOutputPath(jobConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/015c8efb/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 22182ba..e9e08db 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
 import java.text.NumberFormat
 import java.util.Date
 
-import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.FileUtils
@@ -28,14 +28,18 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
 import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
 import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.TaskType
 
-import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+import org.apache.spark._
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnsafeKVExternalSorter
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableJobConf
@@ -45,9 +49,13 @@ import org.apache.spark.util.SerializableJobConf
  * It is based on [[SparkHadoopWriter]].
  */
 private[hive] class SparkHiveWriterContainer(
-    jobConf: JobConf,
-    fileSinkConf: FileSinkDesc)
-  extends Logging with Serializable {
+    @transient jobConf: JobConf,
+    fileSinkConf: FileSinkDesc,
+    inputSchema: Seq[Attribute],
+    table: MetastoreRelation)
+  extends Logging
+  with HiveInspectors
+  with Serializable {
 
   private val now = new Date()
   private val tableDesc: TableDesc = fileSinkConf.getTableInfo
@@ -93,14 +101,12 @@ private[hive] class SparkHiveWriterContainer(
     "part-" + numberFormat.format(splitID) + extension
   }
 
-  def getLocalFileWriter(row: InternalRow, schema: StructType): FileSinkOperator.RecordWriter = {
-    writer
-  }
-
   def close() {
     // Seems the boolean value passed into close does not matter.
-    writer.close(false)
-    commit()
+    if (writer != null) {
+      writer.close(false)
+      commit()
+    }
   }
 
   def commitJob() {
@@ -123,6 +129,13 @@ private[hive] class SparkHiveWriterContainer(
     SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
   }
 
+  def abortTask(): Unit = {
+    if (committer != null) {
+      committer.abortTask(taskContext)
+    }
+    logError(s"Task attempt $taskContext aborted.")
+  }
+
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
     jobID = jobId
     splitID = splitId
@@ -140,6 +153,44 @@ private[hive] class SparkHiveWriterContainer(
     conf.value.setBoolean("mapred.task.is.map", true)
     conf.value.setInt("mapred.task.partition", splitID)
   }
+
+  def newSerializer(tableDesc: TableDesc): Serializer = {
+    val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+    serializer.initialize(null, tableDesc.getProperties)
+    serializer
+  }
+
+  protected def prepareForWrite() = {
+    val serializer = newSerializer(fileSinkConf.getTableInfo)
+    val standardOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
+    val dataTypes = inputSchema.map(_.dataType)
+    val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
+    val outputData = new Array[Any](fieldOIs.length)
+    (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData)
+  }
+
+  // this function is executed on executor side
+  def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+    val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
+    executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
+
+    iterator.foreach { row =>
+      var i = 0
+      while (i < fieldOIs.length) {
+        outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+        i += 1
+      }
+      writer.write(serializer.serialize(outputData, standardOI))
+    }
+
+    close()
+  }
 }
 
 private[hive] object SparkHiveWriterContainer {
@@ -163,25 +214,22 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
 private[spark] class SparkHiveDynamicPartitionWriterContainer(
     jobConf: JobConf,
     fileSinkConf: FileSinkDesc,
-    dynamicPartColNames: Array[String])
-  extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
+    dynamicPartColNames: Array[String],
+    inputSchema: Seq[Attribute],
+    table: MetastoreRelation)
+  extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
 
   import SparkHiveDynamicPartitionWriterContainer._
 
   private val defaultPartName = jobConf.get(
     ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal)
 
-  @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
-
   override protected def initWriters(): Unit = {
-    // NOTE: This method is executed at the executor side.
-    // Actual writers are created for each dynamic partition on the fly.
-    writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
+    // do nothing
   }
 
   override def close(): Unit = {
-    writers.values.foreach(_.close(false))
-    commit()
+    // do nothing
   }
 
   override def commitJob(): Unit = {
@@ -198,33 +246,89 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
     conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
   }
 
-  override def getLocalFileWriter(row: InternalRow, schema: StructType)
-    : FileSinkOperator.RecordWriter = {
-    def convertToHiveRawString(col: String, value: Any): String = {
-      val raw = String.valueOf(value)
-      schema(col).dataType match {
-        case DateType => DateTimeUtils.dateToString(raw.toInt)
-        case _: DecimalType => BigDecimal(raw).toString()
-        case _ => raw
-      }
+  // this function is executed on executor side
+  override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+    val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
+    executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
+
+    val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length)
+    val dataOutput = inputSchema.take(fieldOIs.length)
+    // Returns the partition key given an input row
+    val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema)
+    // Returns the data columns to be written given an input row
+    val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema)
+
+    val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName)
+    // Expressions that given a partition key build a string like: col1=val/col2=val/...
+    val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) =>
+      val escaped =
+        ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType))
+      val str = If(IsNull(c), Literal(defaultPartName), escaped)
+      val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil
+      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
     }
 
-    val nonDynamicPartLen = row.numFields - dynamicPartColNames.length
-    val dynamicPartPath = dynamicPartColNames.zipWithIndex.map { case (colName, i) =>
-      val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType)
-      val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal)
-      val colString =
-        if (string == null || string.isEmpty) {
-          defaultPartName
-        } else {
-          FileUtils.escapePathName(string, defaultPartName)
-        }
-      s"/$colName=$colString"
-    }.mkString
+    // Returns the partition path given a partition key.
+    val getPartitionString =
+      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput)
+
+    // If anything below fails, we should abort the task.
+    try {
+      val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter(
+        StructType.fromAttributes(partitionOutput),
+        StructType.fromAttributes(dataOutput),
+        SparkEnv.get.blockManager,
+        TaskContext.get().taskMemoryManager().pageSizeBytes)
+
+      while (iterator.hasNext) {
+        val inputRow = iterator.next()
+        val currentKey = getPartitionKey(inputRow)
+        sorter.insertKV(currentKey, getOutputRow(inputRow))
+      }
 
-    def newWriter(): FileSinkOperator.RecordWriter = {
+      logInfo(s"Sorting complete. Writing out partition files one at a time.")
+      val sortedIterator = sorter.sortedIterator()
+      var currentKey: InternalRow = null
+      var currentWriter: FileSinkOperator.RecordWriter = null
+      try {
+        while (sortedIterator.next()) {
+          if (currentKey != sortedIterator.getKey) {
+            if (currentWriter != null) {
+              currentWriter.close(false)
+            }
+            currentKey = sortedIterator.getKey.copy()
+            logDebug(s"Writing partition: $currentKey")
+            currentWriter = newOutputWriter(currentKey)
+          }
+
+          var i = 0
+          while (i < fieldOIs.length) {
+            outputData(i) = if (sortedIterator.getValue.isNullAt(i)) {
+              null
+            } else {
+              wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i)))
+            }
+            i += 1
+          }
+          currentWriter.write(serializer.serialize(outputData, standardOI))
+        }
+      } finally {
+        if (currentWriter != null) {
+          currentWriter.close(false)
+        }
+      }
+      commit()
+    } catch {
+      case cause: Throwable =>
+        logError("Aborting task.", cause)
+        abortTask()
+        throw new SparkException("Task failed while writing rows.", cause)
+    }
+    /** Open and returns a new OutputWriter given a partition key. */
+    def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = {
+      val partitionPath = getPartitionString(key).getString(0)
       val newFileSinkDesc = new FileSinkDesc(
-        fileSinkConf.getDirName + dynamicPartPath,
+        fileSinkConf.getDirName + partitionPath,
         fileSinkConf.getTableInfo,
         fileSinkConf.getCompressed)
       newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
@@ -234,7 +338,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
       // to avoid write to the same file when `spark.speculation=true`
       val path = FileOutputFormat.getTaskOutputPath(
         conf.value,
-        dynamicPartPath.stripPrefix("/") + "/" + getOutputName)
+        partitionPath.stripPrefix("/") + "/" + getOutputName)
 
       HiveFileFormatUtils.getHiveRecordWriter(
         conf.value,
@@ -244,7 +348,5 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
         path,
         Reporter.NULL)
     }
-
-    writers.getOrElseUpdate(dynamicPartPath, newWriter())
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org