You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Iaroslav Zeigerman (Jira)" <ji...@apache.org> on 2021/03/01 19:03:00 UTC

[jira] [Created] (FLINK-21548) keyBy operation produces skewed event distribution with low-cardinality keys

Iaroslav Zeigerman created FLINK-21548:
------------------------------------------

             Summary: keyBy operation produces skewed event distribution with low-cardinality keys
                 Key: FLINK-21548
                 URL: https://issues.apache.org/jira/browse/FLINK-21548
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Runtime / Coordination, Runtime / Task
    Affects Versions: 1.12.1, 1.11.0
            Reporter: Iaroslav Zeigerman
         Attachments: Screen Shot 2021-03-01 at 10.52.31 AM.png, Screen Shot 2021-03-01 at 10.54.42 AM.png, Screen Shot 2021-03-01 at 10.57.33 AM.png

When the cardinality of keys matches the existing parallelism not all downstream tasks are utilized in the downstream operator. Even those that are utilized are not utilized evenly.

For example if I have 500 unique keys [0, 500) only 313 downstream tasks (out of 500) will receive any records at all. 
This behavior can easily be reproduced with the following test case:
{code:scala}
import org.apache.flink.runtime.state.KeyGroupRangeAssignment
import scala.util.Random

object Test {

  val parallelism = 500
  val recordsNum  = 1000000

  def run(): Unit = {
    val recordIds = (0 to recordsNum).map(_ % parallelism)
    val tasks     = recordIds.map(selectTask)

    println(s"Total unique keys: ${recordIds.toSet.size}")
    println(s"Key distribution: ${recordIds.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
    println("=======================")
    println(s"Tasks involved: ${tasks.toSet.size}")
    println(s"Record distribution by task: ${tasks.groupBy(identity).mapValues(_.size).toVector.sortBy(-_._2)}")
  }

  def selectTask(key: Int): Int =
    KeyGroupRangeAssignment.assignToKeyGroup(
      key,
      parallelism
    )
}
{code}
Which produces the following results:
{noformat}
Total unique keys: 500
Key distribution: Vector((0,2001), (69,2000), ..., (232,2000), (100,2000))
=======================
Tasks involved: 313
Record distribution by task: Vector((147,10000), (248,10000), ..., (232,2000), (100,2000))
{noformat}
Record distribution visualized:
 !Screen Shot 2021-03-01 at 10.52.31 AM.png!

I have determined that in order to achieve the utilization of all tasks the number of unique keys should be at least 5 times of the parallelism value. The relation between number of unique keys and a fraction of utilized tasks appear to be exponential:
 !Screen Shot 2021-03-01 at 10.54.42 AM.png!  

But with 5x number of keys the skew is still quite significant:
!Screen Shot 2021-03-01 at 10.57.33 AM.png!

Given that keys used in my test are integer values for which `hashCode` returns the value itself I tend to believe that the skew is caused by the Flink's murmur hash implementation which is used [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)