You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ron Crocker <rc...@newrelic.com> on 2017/11/03 03:06:02 UTC

Making external calls from a FlinkKafkaPartitioner

We have a system where the Kafka partition a message should go into is a function of a value in the message. Often, it’s value % # partitions, but for some values it’s not - it’s a specified list of partitions that changes over time. Our “simple Java library” that produces messages for this system also has a background thread that periodically polls a HTTP endpoint (at a rate of 1/minute as its default) to refresh that list of special cases.

It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what I’m not so sure about is how to get this polling operation into the partitioner. I’m about to try it the obvious way (create a background thread that polls the URL and updates the partition map), but I wonder if that’s actually going to cause a bunch of problems for the Flink runtime.

Here’s the code that I have right now:
public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long, String>> {
    private final String partitionerURL;
    private final long updateIntervalInMillis;
    private Map<Long, List<Integer>> partitionMap;
    private ScheduledExecutorService executor;

    public EventInsertPartitioner(String partitionerURL, long updateIntervalInMillis) {
        this.partitionerURL = partitionerURL;
        this.updateIntervalInMillis = updateIntervalInMillis;
        this.partitionMap = new HashMap<>();
    }

    @Override
    public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
        executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(
                () -> updatePartitionMapRunnable(),
                updateIntervalInMillis,
                updateIntervalInMillis,
                TimeUnit.MILLISECONDS);

    }

    private void updatePartitionMapRunnable() {
        // Make synchronous request to partitionerURL
        // This is a simple JSON that matches our data
        String response = "{1:[1,2,3],2:[2]}";
        // Replace current partitionMap with new HashMap from the response
        this.partitionMap = convertResponseToMap(response); 
        // Replacing the current value of partitionMap with the updated version doesn't
        // require synchronization
    }

    private Map<Long, List<Integer>> convertResponseToMap(String response) {
        Map<Long, List<Integer>> hashMap = new HashMap<>();
        // Convert response to JSON structure and just use that?
        // or Iterate and add to local hashMap
        return hashMap;
    }

    @Override
    public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
        long myKey = next.f0;
        
        if (partitionMap.containsKey(myKey)) {
            List<Integer> partitions = partitionMap.get(myKey);
            myKey = partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
        }
        
        return (int)(myKey % numPartitions);
    }
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcrocker@newrelic.com
M: +1 630 363 8835


Re: Making external calls from a FlinkKafkaPartitioner

Posted by Ron Crocker <rc...@newrelic.com>.
Thanks Nico -

Thanks for the feedback, and nice catch on the missing volatile. 

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcrocker@newrelic.com
M: +1 630 363 8835

> On Nov 3, 2017, at 7:48 AM, Nico Kruber <ni...@data-artisans.com> wrote:
> 
> Hi Ron,
> imho your code should be fine (except for a potential visibility problem on the 
> changes of the non-volatile partitionMap member, depending on your needs).
> 
> The #open() method should be called (once) for each sink initialization 
> (according to the javadoc) and then you should be fine with the asynchronous 
> updater thread.
> I'm including Gordon (cc'd) just to be sure as he may know more.
> 
> 
> Nico
> 
> On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
>> We have a system where the Kafka partition a message should go into is a
>> function of a value in the message. Often, it’s value % # partitions, but
>> for some values it’s not - it’s a specified list of partitions that changes
>> over time. Our “simple Java library” that produces messages for this system
>> also has a background thread that periodically polls a HTTP endpoint (at a
>> rate of 1/minute as its default) to refresh that list of special cases.
>> 
>> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
>> what I’m not so sure about is how to get this polling operation into the
>> partitioner. I’m about to try it the obvious way (create a background
>> thread that polls the URL and updates the partition map), but I wonder if
>> that’s actually going to cause a bunch of problems for the Flink runtime.
>> 
>> Here’s the code that I have right now:
>> public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long,
>> String>> { private final String partitionerURL;
>>    private final long updateIntervalInMillis;
>>    private Map<Long, List<Integer>> partitionMap;
>>    private ScheduledExecutorService executor;
>> 
>>    public EventInsertPartitioner(String partitionerURL, long
>> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
>>        this.updateIntervalInMillis = updateIntervalInMillis;
>>        this.partitionMap = new HashMap<>();
>>    }
>> 
>>    @Override
>>    public void open(int parallelInstanceId, int parallelInstances, int[]
>> partitions) { executor = Executors.newScheduledThreadPool(1);
>>        executor.scheduleAtFixedRate(
>>                () -> updatePartitionMapRunnable(),
>>                updateIntervalInMillis,
>>                updateIntervalInMillis,
>>                TimeUnit.MILLISECONDS);
>> 
>>    }
>> 
>>    private void updatePartitionMapRunnable() {
>>        // Make synchronous request to partitionerURL
>>        // This is a simple JSON that matches our data
>>        String response = "{1:[1,2,3],2:[2]}";
>>        // Replace current partitionMap with new HashMap from the response
>>        this.partitionMap = convertResponseToMap(response);
>>        // Replacing the current value of partitionMap with the updated
>> version doesn't // require synchronization
>>    }
>> 
>>    private Map<Long, List<Integer>> convertResponseToMap(String response) {
>> Map<Long, List<Integer>> hashMap = new HashMap<>();
>>        // Convert response to JSON structure and just use that?
>>        // or Iterate and add to local hashMap
>>        return hashMap;
>>    }
>> 
>>    @Override
>>    public int partition(Tuple2<Long, String> next, byte[] serializedKey,
>> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
>> 
>>        if (partitionMap.containsKey(myKey)) {
>>            List<Integer> partitions = partitionMap.get(myKey);
>>            myKey =
>> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
>> 
>>        return (int)(myKey % numPartitions);
>>    }
>> }
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcrocker@newrelic.com
>> M: +1 630 363 8835


Re: Making external calls from a FlinkKafkaPartitioner

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Ron,
imho your code should be fine (except for a potential visibility problem on the 
changes of the non-volatile partitionMap member, depending on your needs).

The #open() method should be called (once) for each sink initialization 
(according to the javadoc) and then you should be fine with the asynchronous 
updater thread.
I'm including Gordon (cc'd) just to be sure as he may know more.


Nico

On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
> We have a system where the Kafka partition a message should go into is a
> function of a value in the message. Often, it’s value % # partitions, but
> for some values it’s not - it’s a specified list of partitions that changes
> over time. Our “simple Java library” that produces messages for this system
> also has a background thread that periodically polls a HTTP endpoint (at a
> rate of 1/minute as its default) to refresh that list of special cases.
> 
> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
> what I’m not so sure about is how to get this polling operation into the
> partitioner. I’m about to try it the obvious way (create a background
> thread that polls the URL and updates the partition map), but I wonder if
> that’s actually going to cause a bunch of problems for the Flink runtime.
> 
> Here’s the code that I have right now:
> public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long,
> String>> { private final String partitionerURL;
>     private final long updateIntervalInMillis;
>     private Map<Long, List<Integer>> partitionMap;
>     private ScheduledExecutorService executor;
> 
>     public EventInsertPartitioner(String partitionerURL, long
> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
>         this.updateIntervalInMillis = updateIntervalInMillis;
>         this.partitionMap = new HashMap<>();
>     }
> 
>     @Override
>     public void open(int parallelInstanceId, int parallelInstances, int[]
> partitions) { executor = Executors.newScheduledThreadPool(1);
>         executor.scheduleAtFixedRate(
>                 () -> updatePartitionMapRunnable(),
>                 updateIntervalInMillis,
>                 updateIntervalInMillis,
>                 TimeUnit.MILLISECONDS);
> 
>     }
> 
>     private void updatePartitionMapRunnable() {
>         // Make synchronous request to partitionerURL
>         // This is a simple JSON that matches our data
>         String response = "{1:[1,2,3],2:[2]}";
>         // Replace current partitionMap with new HashMap from the response
>         this.partitionMap = convertResponseToMap(response);
>         // Replacing the current value of partitionMap with the updated
> version doesn't // require synchronization
>     }
> 
>     private Map<Long, List<Integer>> convertResponseToMap(String response) {
> Map<Long, List<Integer>> hashMap = new HashMap<>();
>         // Convert response to JSON structure and just use that?
>         // or Iterate and add to local hashMap
>         return hashMap;
>     }
> 
>     @Override
>     public int partition(Tuple2<Long, String> next, byte[] serializedKey,
> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
> 
>         if (partitionMap.containsKey(myKey)) {
>             List<Integer> partitions = partitionMap.get(myKey);
>             myKey =
> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
> 
>         return (int)(myKey % numPartitions);
>     }
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcrocker@newrelic.com
> M: +1 630 363 8835