You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2017/03/23 14:14:38 UTC
keyBy doesn't evenly distribute keys
Hi all,
I want keyBy() to evenly distribute records over operator subtasks
especially for a small number of keys.
I execute a test code (see below if interested) with varying numbers of
keys while setting parallelism to 5.
The key assignment to subtasks is as follows:
- 5 keys over 5 subtasks : each subtask with 3, 1, 1, 0, 0 keys,
respectively
- 10 keys over 5 subtasks : each subtask with 4, 3, 1, 1, 1 keys,
respectively
- 20 keys over 5 subtasks : each subtask with 6, 5, 4, 3, 2, respectively
- 30 keys over 5 subtasks : each subtask with 8, 7, 6, 6, 3, respectively
- 40 keys over 5 subtasks : each subtask with 11, 10, 8, 3, 3, respectively
- 50 keys over 5 subtasks : each subtask with 13, 11, 10, 9 ,7, respectively
I repeated the test for each setting, and found that the key assignment is
deterministic when # keys and # subtasks are fixed.
I manage to do that with customPartition().
But what I really want is to get a keyed stream and to apply a window
function to a sliding window (not shown in the below code though)
I found that Stephan once suggested to generate a special key to be used by
keyBy() as shown in http://apache-flink-user-mailing-list-archive.
2336050.n4.nabble.com/keyBy-using-custom-partitioner-td5379.html,
but I cannot find any example about it.
How can I generate a special key in order to evenly distribute keys to
operator subtasks?
Otherwise, is there another way of evenly distributing keys?
- Dongwon Kim
---- test code -----
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.collection.mutable
case class Data(time: Int)
object FlinkTest extends App {
val parallelism = 5
val numberOfKeys = 5 // 5, 10, 20, 30, 40, 50
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
env
.addSource(
new RichSourceFunction[(Int, Data)] {
var running = false
override def open(parameters: Configuration): Unit = { running =
true }
override def run(ctx: SourceContext[(Int, Data)]): Unit = {
val iter = (1 to 10000).iterator
while (running && iter.hasNext) {
val t = iter.next
(1 to numberOfKeys) foreach { key =>
ctx.collect((key, new Data(t)))
}
}
running = false
}
override def cancel(): Unit = { running = false }
}
)
.keyBy(0)
// .partitionCustom(
// new Partitioner[Int](){
// override def partition(key: Int, numPartitions: Int): Int =
key % numPartitions
// },
// _._1
// )
.addSink(
new RichSinkFunction[(Int, Data)] {
var counts: mutable.HashMap[Int, Int] = _
override def open(parameters: Configuration): Unit = {
counts = new mutable.HashMap()
}
override def invoke(record: (Int, Data)) = {
val key = record._1
val cnt = counts.getOrElseUpdate(key, 0)
counts.update(key, cnt+1)
}
override def close(): Unit = {
println(s"close : ${counts.size} $counts")
}
}
)
env.execute()
}