You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/06/09 20:10:00 UTC

[jira] [Updated] (SPARK-27853) Allow for custom Partitioning implementations

     [ https://issues.apache.org/jira/browse/SPARK-27853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-27853:
----------------------------------
    Affects Version/s:     (was: 2.4.3)
                       3.0.0

> Allow for custom Partitioning implementations
> ---------------------------------------------
>
>                 Key: SPARK-27853
>                 URL: https://issues.apache.org/jira/browse/SPARK-27853
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 3.0.0
>            Reporter: Marc Arndt
>            Priority: Major
>
> When partitioning a Dataset Spark uses the physical plan element ShuffleExchangeExec together with a Partitioning instance. 
> I find myself in situation where I need to provide my own partitioning criteria, that decides to which partition each InternalRow should belong. According to the Spark API I would expect to be able to provide my custom partitioning criteria as a custom implementation of the Partitioning interface.
> Sadly after implementing a custom Partitioning implementation you will receive a "Exchange not implemented for $newPartitioning" error message, because of the following code inside the ShuffleExchangeExec#prepareShuffleDependency method:
> {code:scala}
> val part: Partitioner = newPartitioning match {
>     case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
>     case HashPartitioning(_, n) =>
>     new Partitioner {
>         override def numPartitions: Int = n
>         // For HashPartitioning, the partitioning key is already a valid partition ID, as we use
>         // `HashPartitioning.partitionIdExpression` to produce partitioning key.
>         override def getPartition(key: Any): Int = key.asInstanceOf[Int]
>     }
>     case RangePartitioning(sortingExpressions, numPartitions) =>
>     // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
>     // partition bounds. To get accurate samples, we need to copy the mutable keys.
>     val rddForSampling = rdd.mapPartitionsInternal { iter =>
>         val mutablePair = new MutablePair[InternalRow, Null]()
>         iter.map(row => mutablePair.update(row.copy(), null))
>     }
>     implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
>     new RangePartitioner(
>         numPartitions,
>         rddForSampling,
>         ascending = true,
>         samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
>     case SinglePartition =>
>     new Partitioner {
>         override def numPartitions: Int = 1
>         override def getPartition(key: Any): Int = 0
>     }
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
>     // TODO: Handle BroadcastPartitioning.
> }
> def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
>     case RoundRobinPartitioning(numPartitions) =>
>     // Distributes elements evenly across output partitions, starting from a random partition.
>     var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
>     (row: InternalRow) => {
>         // The HashPartitioner will handle the `mod` by the number of partitions
>         position += 1
>         position
>     }
>     case h: HashPartitioning =>
>     val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
>     row => projection(row).getInt(0)
>     case RangePartitioning(_, _) | SinglePartition => identity
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> }
> {code}
> The code in the above code snippet matches the given Partitioning instance "newPartitioning" against a set of given hardcoded Partitioning types. When adding a new Partitioning implementation the pattern matching won't be able to find a pattern for it and therefore will use the fallback case:
> {code:java}
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> {code}
> and throw an exception.
> To be able to provide custom partition behaviour I would suggest to change the implementation in ShuffleExchangeExec to be able to work with an arbitrary Partitioning implementation. For the Partition creation I would imagine that this can be done in a nice way inside the Partitioning classes via a Partitioning#createPartitioner method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org