You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2017/02/15 16:32:56 UTC

[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16898#discussion_r101320333
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---
    @@ -363,80 +393,42 @@ object FileFormatWriter extends Logging {
             committer.newTaskTempFile(taskAttemptContext, partDir, ext)
           }
     
    -      currentWriter = description.outputWriterFactory.newInstance(
    +      currentWriter = desc.outputWriterFactory.newInstance(
             path = path,
    -        dataSchema = description.dataColumns.toStructType,
    +        dataSchema = desc.dataColumns.toStructType,
             context = taskAttemptContext)
         }
     
         override def execute(iter: Iterator[InternalRow]): Set[String] = {
    -      // We should first sort by partition columns, then bucket id, and finally sorting columns.
    -      val sortingExpressions: Seq[Expression] =
    -        description.partitionColumns ++ bucketIdExpression ++ sortColumns
    -      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
    -
    -      val sortingKeySchema = StructType(sortingExpressions.map {
    -        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
    -        // The sorting expressions are all `Attribute` except bucket id.
    -        case _ => StructField("bucketId", IntegerType, nullable = false)
    -      })
    +      val getPartitionColsAndBucketId = UnsafeProjection.create(
    +        desc.partitionColumns ++ bucketIdExpression, desc.allColumns)
     
    -      // Returns the data columns to be written given an input row
    -      val getOutputRow = UnsafeProjection.create(
    -        description.dataColumns, description.allColumns)
    -
    -      // Returns the partition path given a partition key.
    -      val getPartitionStringFunc = UnsafeProjection.create(
    -        Seq(Concat(partitionStringExpression)), description.partitionColumns)
    -
    -      // Sorts the data before write, so that we only need one writer at the same time.
    -      val sorter = new UnsafeKVExternalSorter(
    -        sortingKeySchema,
    -        StructType.fromAttributes(description.dataColumns),
    -        SparkEnv.get.blockManager,
    -        SparkEnv.get.serializerManager,
    -        TaskContext.get().taskMemoryManager().pageSizeBytes,
    -        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
    -          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
    +      // Generates the partition path given the row generated by `getPartitionColsAndBucketId`.
    +      val getPartPath = UnsafeProjection.create(
    +        Seq(Concat(partitionPathExpression)), desc.partitionColumns)
     
    -      while (iter.hasNext) {
    -        val currentRow = iter.next()
    -        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
    -      }
    -
    -      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
    -        identity
    -      } else {
    -        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map {
    -          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
    -        })
    -      }
    -
    -      val sortedIterator = sorter.sortedIterator()
    +      // Returns the data columns to be written given an input row
    +      val getOutputRow = UnsafeProjection.create(desc.dataColumns, desc.allColumns)
     
           // If anything below fails, we should abort the task.
           var recordsInFile: Long = 0L
           var fileCounter = 0
    -      var currentKey: UnsafeRow = null
    +      var currentPartColsAndBucketId: UnsafeRow = null
           val updatedPartitions = mutable.Set[String]()
    -      while (sortedIterator.next()) {
    -        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
    -        if (currentKey != nextKey) {
    -          // See a new key - write to a new partition (new file).
    -          currentKey = nextKey.copy()
    -          logDebug(s"Writing partition: $currentKey")
    +      for (row <- iter) {
    +        val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
    --- End diff --
    
    if you take a look at the `GenerateUnsafeProject`, actually it will reuse the same row instance, so we need to copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org