You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Weitong Chen <26...@qq.com> on 2017/10/07 16:18:10 UTC
[spark-core] SortShuffleManager - when to enable Serialized sorting
hi,
Why check dependency.aggregator but not dependency.mapSideCombine in
canUseSerializedShuffle?
In BaseShuffle' SortShuffleWriter, dep.mapSideCombine decides
dep.aggregator is passed to sorter or not.
*canUseSerializedShuffle*
/**
* Helper method for determining whether a shuffle should use an optimized
serialized shuffle
* path or whether it should fall back to the original path that operates
on deserialized objects.
*/
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]):
Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because
the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object
relocation")
false
} else if *(dependency.aggregator.isDefined*) {
log.debug(
s"Can't use serialized shuffle for shuffle $shufId because an
aggregator is defined")
false
} else if (numPartitions >
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because
it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
}
*SortShuffleWriter*
private[spark] class SortShuffleWriter[K, V, C](
...
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if *(dep.mapSideCombine*) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator
specified!")
new ExternalSorter[K, V, C](
context,* dep.aggregato*r, Some(dep.partitioner), dep.keyOrdering,
dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the
sorter, because we don't
// care whether the keys get sorted in each partition; that will be
done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, *aggregator = None*, Some(dep.partitioner), ordering =
None, dep.serializer)
}
sorter.insertAll(records)
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org