You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Weitong Chen <26...@qq.com> on 2017/10/07 16:18:10 UTC

[spark-core] SortShuffleManager - when to enable Serialized sorting

hi,

   Why check dependency.aggregator but not dependency.mapSideCombine in
canUseSerializedShuffle?
   In BaseShuffle' SortShuffleWriter, dep.mapSideCombine decides
dep.aggregator is passed to sorter or not.
   
  *canUseSerializedShuffle*
  /**
   * Helper method for determining whether a shuffle should use an optimized
serialized shuffle
   * path or whether it should fall back to the original path that operates
on deserialized objects.
   */
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]):
Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because
the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object
relocation")
      false
    } else if *(dependency.aggregator.isDefined*) {
      log.debug(
        s"Can't use serialized shuffle for shuffle $shufId because an
aggregator is defined")
      false
    } else if (numPartitions >
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because
it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }
}
 
*SortShuffleWriter*
private[spark] class SortShuffleWriter[K, V, C](
 ...
 /** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if *(dep.mapSideCombine*) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator
specified!")
      new ExternalSorter[K, V, C](
        context,* dep.aggregato*r, Some(dep.partitioner), dep.keyOrdering,
dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the
sorter, because we don't
      // care whether the keys get sorted in each partition; that will be
done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, *aggregator = None*, Some(dep.partitioner), ordering =
None, dep.serializer)
    }
    sorter.insertAll(records)

   



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org