You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/19 08:54:09 UTC

[GitHub] [spark] albertusk95 commented on a change in pull request #15297: [SPARK-9862]Handling data skew

albertusk95 commented on a change in pull request #15297: [SPARK-9862]Handling data skew
URL: https://github.com/apache/spark/pull/15297#discussion_r305259187
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 ##########
 @@ -184,10 +189,146 @@ class ExchangeCoordinator(
 
       i += 1
     }
-
     partitionStartIndices.toArray
   }
 
+
+
+ /**
+  * the skew algorithm , given last stage map output statitsics,  partitionStartIndices
+  * provided by estimatePartitionStartIndices function. pre-shuffle stages partition num.
+  * And return a Array of 2-item tuples , the return value is use to create SkewShuffleRowRDD.
+  *
+  * we find data skew partition by mapOutputStatistics, and reuse partitionStartIndices which
+  * provided by estimatePartitionStartIndices function, to generate new partition start indices
+  * call "skewPartitionStartIndices". skewPartitionStartIndices is the function return value use
+  * for generate SkewShuffleRowRDD
+  * For example, we have two stages with the following pre-shuffle partition size statistics
+  * stage 1: [100 MB, 10 MB, 20000 MB, 10MB, 30 MB] and stage 1 partition num is 3
+  * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
+  * assuming the target input size is 128 MB
+  * obviously partition 3 is data skew partition is [2].
+  * the partitionStartIndices is [0,3]
+  * in this case , we find partition3 is data skew,as SPARK-9862 said ,we don't put this
+  * partition in a reduce task.but broadcast other stage partition3 to this stage partition.
+  * so the skewPartitionStartIndices  like this:
+  * ( 5/*partition num*/
+  * ((-1/*mean no skew*/,0/* index like partitionStartIndices */ ),1 /*only generate 1 partition*/)
+  * (1/*mean this side data skew*/,2/*index*/,3/*generate 3 partition*/),
+  * (-1,3,1))// this for generate  SkewShuffleRowRDD of stage 1.
+  * ( 5/*partition num*/)
+  * ((-1/*mean no skew*/),0/*index like partitionStartIndices */,1 /*only generate 1 partition*/),
+  * (2/*mean  other side data skew*/,2/*index*/,3/*generate 3 partition*/),
+  * (-1,3,1)// this for generate SkewShuffleRowRDD of stage 2.
+  *
+  * @param mapOutputStatistics pre-shuffle stages.
+  * @param prePartitionNum  partition num of pre-shuffle stages
+  * @param partitionStartIndices provided by estimatePartitionStartIndices function
+  * @return return Array of 2-item tuples, the first item in the tuple is mean how many
+  * partition should generate by SkewShuffleRowRDD, if the value is -1, then use ShuffledRowRDD
+  * second item is a array of (isSkew, partition index, gen partition num)
+  * isSkew is -1 mean's no skew. 1 my side is skew. SkewShuffleRowRDD should generate many
+  * partition by gen partition num,a partition only read a pre-state partition one block
+  * isSkew is 2 mean's other side data skew , so SkewShuffleRowRDD should generate many some
+  * partition .
+  */
+ def skewPartitionIdx(
+    mapOutputStatistics: Array[MapOutputStatistics],
+    prePartitionNum: Array[Int],
+    partitionStartIndices: Option[Array[Int]] = None):
+ Array[(Int, Array[(Int, Long, Int, Int)])] = {
+
+   if (mapOutputStatistics.length != 2 || !isJoin || joinType == FullOuter) {
+     return (0 until numExchanges).map(_ =>
+       (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+   }
+   // find which partition is skew
+   var skewPartition = mapOutputStatistics.map(ms =>
+     ms.bytesByPartitionId.zipWithIndex
+       .filter(x => x._1 > skewThreshold)
+   )
+   // if 2 stage some partition output size both over than skewSize
+   // then choose a size big one as the skew side.
+   skewPartition = skewPartition.zipWithIndex.map(sti => {
+     val index = if (sti._2 == 0) 1 else 0
+     sti._1.filterNot(
+       p => skewPartition(index).exists(p1 => p1._2 == p._2 && p._1 < p1._1))
+   })
+
+   if (joinType == LeftOuter && skewPartition(1).length > 0 ||
+   joinType == RightOuter && skewPartition(0).length > 0 ) {
+     return (0 until numExchanges).map(_ =>
+       (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+   }
+   // skewSize must great than TargetPostShuffleInputSize
 
 Review comment:
   `// skewSize must greater than TargetPostShuffleInputSize`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org