You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "m@xi" <ma...@gmail.com> on 2018/02/22 05:28:54 UTC

Re: Use keyBy to deterministically hash each record to a processor/task/slot

Hello!

I have used up till now your method to generate keys for the .keyBy()
function, in order to specifically know at which processor id each tuple
will end up in the end (w.r.t the key % #procs operation).

Though I had to shift to Java cause the documentation is better. And I
implemented your function in Java but it is way much slower. E.g. if I wanna
generate 30 keys it stalls for way tooo long.

Below I attach my Java version. If you or someone else may propose something
or find any bugs please inform me.

Thanks in advance.

Cheers,
Max

// --- --- ---

package org.apache.flink.srhcjoin.srhc;

import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.MathUtils;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;

public class KeyGen
{
    private int partitions;
    private int maxPartitions;
    private HashMap<Integer, Queue&lt;Integer>> cache = new HashMap<Integer,
Queue&lt;Integer>>();
    private int lastPosition = 1;

    public KeyGen(final int partitions, final int maxPartitions)
    {
        this.partitions    = partitions;
        this.maxPartitions = maxPartitions;
    }

    public KeyGen(final int partitions)
    {
        this.partitions    = partitions;
        this.maxPartitions = 128;
    }

    Integer next(int targetPartition)
    {
        Queue<Integer> queue;
        if (cache.containsKey(targetPartition))
            queue = cache.get(targetPartition);
        else
            queue = new LinkedList<Integer>();

        // Check if queue is empty
        if (queue.size() == 0)
        {
            boolean found = false;
            while (!found)
            {
                for (int id = lastPosition ; id < 100 ; id++)
                {
                    //System.out.println("Hey " + id);

                    int partition = (MathUtils.murmurHash(id) %
maxPartitions) * partitions / maxPartitions;

                    if (cache.containsKey(partition))
                        queue = cache.get(partition);
                    else
                        queue = new LinkedList<Integer>();
                    // Add element to the queue
                    queue.add(id);

                    if (partition == targetPartition) {
                        found = true;
                        break; // break the for loop
                    }
                }
            }
        }

        return queue.poll(); // Return the first elements and deletes it -->
similar to dequeue of scala's mutable.Queue
    }

    public static void main(String[] args) throws Exception
    {
        //Generate intermediate keys
        final int p = 4;
        int numPartitions = p;
        int numKeys       = p;
        int parallelism   = p;

        KeyGen keyGenerator = new KeyGen(numPartitions,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism));
        int[] procID = new int[numKeys];

        for (int i = 0; i < numKeys ; i++)
            procID[i] = keyGenerator.next(i);

        for (int elem : procID)
            System.out.println(elem);
    }
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/