You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "m@xi" <ma...@gmail.com> on 2017/10/30 15:20:38 UTC

Use keyBy to deterministically hash each record to a processor/task/slot

Hi all,

After trying to understand exactly how keyBy works internally, I did not get
anything more than "it applies obj.hashcode() % n", where n is the number of
tasks/processors.

This post for example
https://stackoverflow.com/questions/45062061/why-is-keyed-stream-on-a-keyby-creating-skewed-downstream-execution,
suggest to implement a KeySelector and write our own hashcode function.
Though none of the above is clear, especially the hashcode part.

I am running a pc with 4 slots/processors and I would like to hash each
record based on a certain field to a specific processor. Ideally, lets say
that the 4 processors have ids: 0, 1, 2, 3. Then I would like to send the
tuples whose (key % 4) = 0 to the proc with id 0,  (key % 4) = 1 to the proc
with id 1 etc etc.

I would like to know exactly to which processor/task each tuple goes.
Can I do that deterministically with keyBy in Flink??

Thanks in advance.
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Posted by "m@xi" <ma...@gmail.com>.
Hello Dongwon,

Thanks a lot for your excellent reply! Seems we have the same problem. Still
your solution is less hard coded than mine.

Thanks a lot!

I am also looking forward to see a capability of creating a custom
partitioner for keyBy() in Flink.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Posted by "m@xi" <ma...@gmail.com>.
Hello!

I have used up till now your method to generate keys for the .keyBy()
function, in order to specifically know at which processor id each tuple
will end up in the end (w.r.t the key % #procs operation).

Though I had to shift to Java cause the documentation is better. And I
implemented your function in Java but it is way much slower. E.g. if I wanna
generate 30 keys it stalls for way tooo long.

Below I attach my Java version. If you or someone else may propose something
or find any bugs please inform me.

Thanks in advance.

Cheers,
Max

// --- --- ---

package org.apache.flink.srhcjoin.srhc;

import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.MathUtils;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;

public class KeyGen
{
    private int partitions;
    private int maxPartitions;
    private HashMap<Integer, Queue&lt;Integer>> cache = new HashMap<Integer,
Queue&lt;Integer>>();
    private int lastPosition = 1;

    public KeyGen(final int partitions, final int maxPartitions)
    {
        this.partitions    = partitions;
        this.maxPartitions = maxPartitions;
    }

    public KeyGen(final int partitions)
    {
        this.partitions    = partitions;
        this.maxPartitions = 128;
    }

    Integer next(int targetPartition)
    {
        Queue<Integer> queue;
        if (cache.containsKey(targetPartition))
            queue = cache.get(targetPartition);
        else
            queue = new LinkedList<Integer>();

        // Check if queue is empty
        if (queue.size() == 0)
        {
            boolean found = false;
            while (!found)
            {
                for (int id = lastPosition ; id < 100 ; id++)
                {
                    //System.out.println("Hey " + id);

                    int partition = (MathUtils.murmurHash(id) %
maxPartitions) * partitions / maxPartitions;

                    if (cache.containsKey(partition))
                        queue = cache.get(partition);
                    else
                        queue = new LinkedList<Integer>();
                    // Add element to the queue
                    queue.add(id);

                    if (partition == targetPartition) {
                        found = true;
                        break; // break the for loop
                    }
                }
            }
        }

        return queue.poll(); // Return the first elements and deletes it -->
similar to dequeue of scala's mutable.Queue
    }

    public static void main(String[] args) throws Exception
    {
        //Generate intermediate keys
        final int p = 4;
        int numPartitions = p;
        int numKeys       = p;
        int parallelism   = p;

        KeyGen keyGenerator = new KeyGen(numPartitions,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism));
        int[] procID = new int[numKeys];

        for (int i = 0; i < numKeys ; i++)
            procID[i] = keyGenerator.next(i);

        for (int elem : procID)
            System.out.println(elem);
    }
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Posted by Dongwon Kim <ea...@gmail.com>.
Hello,

As I need to generate the same number of keys as that of partitions, I also
suffer from this problem [1]:
My current solution is to generate enough keys until I have at least one
key per partition, which looks very stupid to me (I copy and paste my code
below).
If Flink changes its way to compute a partition from a given key for keyBy
operator, I need to modify my vulnerable key generator.
Stephan once mentioned that a custom partitioner for keyBy can make things
complicated in [2].

Hope Flink can provide a way to specify a custom partitioner for keyBy.
I know Flink is primarily targeting data intensive applications as
mentioned in [3],
but compute-intensive applications (especially from the
MachineLearning/DeepLearning domain) can require this feature for evenly
distributing a small number of keys over another small number of
partitions.

Below is my *vulnerable* key generator written in Scala:
>>>>
import org.apache.flink.util.MathUtils
import scala.collection.mutable

class KeyGenerator(val partitions: Int,
                  val maxPartitions: Int) {

  def this(partitions: Int) = this(partitions, 128)

  val ids = Stream.from(1).iterator
  val cache = mutable.HashMap[Int, mutable.Queue[Int]]()

  def next(targetPartition: Int): Int = {
    val queue = cache.getOrElseUpdate(targetPartition, mutable.Queue[Int]())
    if (queue.size == 0) {
      var found = false
      while (!found) {
        val id = ids.next
        val partition =
          (MathUtils.murmurHash(id) % maxPartitions) * partitions /
maxPartitions

        cache
          .getOrElseUpdate(partition, mutable.Queue[Int]())
          .enqueue(id)

        if (partition == targetPartition) {
          found = true
        }
      }
    }
    queue.dequeue()
  }
}
<<<<

I use it like this:
>>>>
import org.apache.flink.runtime.state.KeyGroupRangeAssignment
...
    val numPartitions = 10
    val numKeys = 10
    val parallelism = 10

    val keyGenerator = new KeyGenerator(numPartitions,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism))
    val desiredKeys = (0 until numKeys) map idGen.next
...
<<<<

Thanks,

[1]
https://image.slidesharecdn.com/pdm-with-apache-flink-flinkforward-170919021613/95/predictive-maintenance-with-deep-learning-and-apache-flink-41-638.jpg?cb=1505787617
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-tt5379.html#a5389
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-td8481.htm

- Dongwon


On Thu, Nov 2, 2017 at 8:00 PM, m@xi <ma...@gmail.com> wrote:

> Hello Tony,
>
> Thanks a lot for your answer. Now I know exactly what happens with keyBy
> function, yet still I haven't figured out a proper (non hard coded way) to
> deterministically send a tuple to each key.
>
> If somenone from the Flink team could help it would be great!
>
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Posted by "m@xi" <ma...@gmail.com>.
Hello Tony,

Thanks a lot for your answer. Now I know exactly what happens with keyBy
function, yet still I haven't figured out a proper (non hard coded way) to
deterministically send a tuple to each key.

If somenone from the Flink team could help it would be great!

Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Posted by Tony Wei <to...@gmail.com>.
Hi Max,

The way that Flink to assign key to which subtask is based on
`KeyGroupRangeAssignment.assignKeyToParallelOperator`.
Its first step is to assign key to a key group based on the max parallelism
[2]. Then, assign each key group to a specific subtask based on the current
parallelism [3].

The question that you asked is if the keyBy in Flink is deterministic. I
think the answer is yes, but the problem is that assignment to key group is
not just `obj.hashCode()`, but `murmurhash(obj.hashCode())` instead.
If you can know the output from murmurhash on the each object, you can
determine which subtask that operator will go to.
I'm not sure if this is a good solution and I am also wondering if it can
be fulfilled.

Best Regards,
Tony Wei

[1]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L47
[2]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L58
[3]
https://github.com/apache/flink/blob/0182141d41be52ae0cf4a3563d4b8c6f3daca02e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L115

2017-10-30 23:20 GMT+08:00 m@xi <ma...@gmail.com>:

> Hi all,
>
> After trying to understand exactly how keyBy works internally, I did not
> get
> anything more than "it applies obj.hashcode() % n", where n is the number
> of
> tasks/processors.
>
> This post for example
> https://stackoverflow.com/questions/45062061/why-is-
> keyed-stream-on-a-keyby-creating-skewed-downstream-execution,
> suggest to implement a KeySelector and write our own hashcode function.
> Though none of the above is clear, especially the hashcode part.
>
> I am running a pc with 4 slots/processors and I would like to hash each
> record based on a certain field to a specific processor. Ideally, lets say
> that the 4 processors have ids: 0, 1, 2, 3. Then I would like to send the
> tuples whose (key % 4) = 0 to the proc with id 0,  (key % 4) = 1 to the
> proc
> with id 1 etc etc.
>
> I would like to know exactly to which processor/task each tuple goes.
> Can I do that deterministically with keyBy in Flink??
>
> Thanks in advance.
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>