You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Julien <jm...@gmail.com> on 2018/02/18 12:49:51 UTC

A "per operator instance" window all ?

Hi,

I am pretty new to flink and I don't know what will be the best way to 
deal with the following use case:

  * as an input, I recieve some alerts from a kafka topic
      o an alert is linked to a network resource (like router-1,
        router-2, switch-1, switch-2, ...)
      o so an alert has two main information (the alert id and the
        resource id of the resource on which this alert has been raised)
  * then I need to do a query to an external system in order to enrich
    the alert with additional information on the resource


(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

  * I do not want to do 1 query per resource id
  * I want to do a small number of queries in parallel (for example 4
    queries in parallel every 500ms), each query requesting the external
    system for several alerts linked to several resource id

Currently, I don't know what will be the best way to deal with that:

  * I can key my stream on the resource id and then define a processing
    time window of 500ms and when the trigger is ok, then I do my query
      o by doing so, I will "group" several alerts in a single query,
        but they will all be linked to the same resource.
      o so I will do 1 query per resource id (which will be too much in
        my use case)
  * I can also do a windowAll on a non keyed stream
      o by doing so, I will "group" together alerts from different
        resource ids, but from what I've read in such a case the
        parallelism will always be one.
      o so in this case, I will only do 1 query whereas I'd like to have
        some parallelism

I am thinking that a way to deal with that will be:

  * define the resource id as the key of stream and put a parallelism of 4
  * and then having a way to do a windowAll on this keyed stream
      o which is that, on a given operator instance, I will "group" on
        the same window all the keys (ie all the resource ids) managed
        by this operator instance
      o with a parallelism of 4, I will do 4 queries in parallel (1 per
        operator instance, and each query will be for several alerts
        linked to several resource ids)

But after looking at the documentation, I cannot see this ability 
(having a windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?


I've tried for example to review my key and to do something like 
"resourceId.hahsCode%<max nb of queries in parallel>" and then to use a 
time window.

In my example above, the <max nb of queries in parallel> will be 4. And 
all my keys will be 0, 1, 2 or 3.

The issue with this approach is that due to the way the operatorIdx is 
computed based on the key, it does not distribute well my processing:

  * when this partitioning logic from the "KeyGroupRangeAssignment"
    class is applied
      o //**
              * Assigns the given key to a parallel operator index.
              *
              * @param key the key to assign
              * @param maxParallelism the maximum supported parallelism,
        aka the number of key-groups.
              * @param parallelism the current parallelism of the operator
              * @return the index of the parallel operator to which the
        given key should be routed.
              */
             public static int assignKeyToParallelOperator(Object key,
        int maxParallelism, int parallelism) {
                 return computeOperatorIndexForKeyGroup(maxParallelism,
        parallelism, assignToKeyGroup(key, maxParallelism));
             }

             /**
              * Assigns the given key to a key-group index.
              *
              * @param key the key to assign
              * @param maxParallelism the maximum supported parallelism,
        aka the number of key-groups.
              * @return the key-group to which the given key is assigned
              */
             public static int assignToKeyGroup(Object key, int
        maxParallelism) {
                 return computeKeyGroupForKeyHash(key.hashCode(),
        maxParallelism);
             }/
      o key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2
        over my 4 operators will not have anything to do)


So, what will be the best way to deal with that?


Thank you in advance for your support.

Regards.


Julien.



Re: A "per operator instance" window all ?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Julien,

at the moment Flink only supports parallel windows which are keyed. What
you would need is something like a per-partition window which is currently
not supported. The problem with that is that it is not clear how to rescale
a per-partition window because it effectively means that you have only as
many key groups as you have partitions. What you can also do is to key by a
prefix of your resource id. That way you will group more resource ids into
the same window. Choosing a prefix which gives you enough groups to evenly
utilize your workers as well as higher granularity for your external
requests should then be doable.

Ken's solution should work for your use case. However, be aware that this
will break as soon as Flink changes its internal key to key-group mapping.

Cheers,
Till

On Mon, Feb 19, 2018 at 5:27 PM, Ken Krugler <kk...@transpac.com>
wrote:

> Hi Julien,
>
> I'd run into a similar situation, where I need to have a keyed stream, but
> I want (effectively) one key per task.
>
> It’s possible to generate keys that will get distributed as you need,
> though it does require making assumptions about how Flink generates
> hashes/key groups.
>
> And once you start talking about state, then it gets a bit harder, as you
> need to know the max parallelism, which is used to calculate “key groups”.
>
> Below is a cheesy function I wrote to make an Integer that (if used as the
> key) will partition the record to the target operator.
>
> I use it in a custom Map function to add a key field.
>
> — Ken
>
> /**
> * Return an integer value that will get partitioned to the target
> <operatorIndex>, given the
> * workflow's <maxParallelism> (for key groups) and the operator
> <parallelism>.
> *
> * @param maxParallelism
> * @param parallelism
> * @param operatorIndex
> * @return Integer suitable for use in a record as the key.
> */
> public static Integer makeKeyForOperatorIndex(int maxParallelism, int
> parallelism, int operatorIndex) {
> if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) {
> maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(p
> arallelism);
> }
>
>
> for (int i = 0; i < maxParallelism * 2; i++) {
> Integer key = new Integer(i);
> int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key,
> maxParallelism);
> int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
> maxParallelism, parallelism, keyGroup);
> if (index == operatorIndex) {
> return key;
> }
> }
>
>
> throw new RuntimeException(String.format("Unable to find key for target
> operator index %d (max parallelism = %d, parallelism = %d",
> operatorIndex, maxParallelism, parallelism));
> }
>
>
> On Feb 19, 2018, at 12:34 AM, Julien <jm...@gmail.com> wrote:
>
> Hello,
>
> I've already tried to key my stream with "resourceId.hashCode%parallelism"
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed
> stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
>
> I think I should explore the customer partitioner, as you suggested
> Xingcan.
> Maybe my last question on this will be: "can you give me more details on
> this point "and simulate a window operation by yourself in a
> ProcessFunction" ?
>
> When I look at the documentation about the custom partitioner, I can see
> that the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a
> parallelism of 1, no ?).
>
> And if I do something like "myStream.partitionCustom(<my new
> partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my
> custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to
> the "KeyGroupStreamPartitioner" and forget my custom partitioner ?
>
> Thanks again for your feedback,
>
> Julien.
>
>
> On 19/02/2018 03:45, 周思华 wrote:
>
> Hi Julien,
>     If I am not misunderstand, I think you can key your stream on a
> `Random.nextInt() % parallesm`, this way  you can "group" together alerts
> from different and benefit from multi parallems.
>
>
> 发自网易邮箱大师
>
> On 02/19/2018 09:08,Xingcan Cui<xi...@gmail.com> <xi...@gmail.com>
> wrote:
>
> Hi Julien,
>
> sorry for my misunderstanding before. For now, the window can only be
> defined on a KeyedStream or an ordinary DataStream but with parallelism =
> 1. I’d like to provide three options for your scenario.
>
> 1. If your external data is static and can be fit into the memory, you can
> use ManagedStates
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to
> cache them without considering the querying problem.
> 2. Or you can use a CustomPartitioner
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to
> manually distribute your alert data and simulate an window operation by
> yourself in a ProcessFuncton.
> 3. You may also choose to use some external systems such as in-memory
> store, which can work as a cache for your queries.
>
> Best,
> Xingcan
>
> On 19 Feb 2018, at 5:55 AM, Julien <jm...@gmail.com> wrote:
>
> Hi Xingcan,
>
> Thanks for your answer.
> Yes, I understand that point:
>
>    - if I have 100 resource IDs with parallelism of 4, then each operator
>    instance will handle about 25 keys
>
>
> The issue I have is that I want, on a given operator instance, to group
> those 25 keys together in order to do only 1 query to an external system
> per operator instance:
>
>    - on a given operator instance, I will do 1 query for my 25 keys
>    - so with the 4 operator instances, I will do 4 query in parallel
>    (with about 25 keys per query)
>
>
> I do not know how I can do that.
>
> If I define a window on my keyed stream (with for example
> *stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), *
> then my understanding is that the window is "associated" to the key. So
> in this case, on a given operator instance, I will have 25 of those windows
> (one per key), and I will do 25 queries (instead of 1).
>
> Do you understand my point ?
> Or maybe am I missing something ?
>
> I'd like to find a way on operator instance 1 to group all the alerts
> received on those 25 resource ids and do 1 query for those 25 resource ids.
> Same thing for operator instance 2, 3 and 4.
>
>
> Thank you,
> Regards.
>
>
> On 18/02/2018 14:43, Xingcan Cui wrote:
>
> Hi Julien,
>
> the cardinality of your keys (e.g., resource ID) will not be restricted to
> the parallelism. For instance, if you have 100 resource IDs processed by
> KeyedStream with parallelism 4, each operator instance will handle about 25
> keys.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On 18 Feb 2018, at 8:49 PM, Julien <jm...@gmail.com> wrote:
>
> Hi,
>
> I am pretty new to flink and I don't know what will be the best way to
> deal with the following use case:
>
>    - as an input, I recieve some alerts from a kafka topic
>       - an alert is linked to a network resource (like router-1,
>       router-2, switch-1, switch-2, ...)
>       - so an alert has two main information (the alert id and the
>       resource id of the resource on which this alert has been raised)
>    - then I need to do a query to an external system in order to enrich
>    the alert with additional information on the resource
>
>
> (A "natural" candidate for the key on this stream will be the resource id)
>
> The issue I have is that regarding the query to the external system:
>
>    - I do not want to do 1 query per resource id
>    - I want to do a small number of queries in parallel (for example 4
>    queries in parallel every 500ms), each query requesting the external system
>    for several alerts linked to several resource id
>
> Currently, I don't know what will be the best way to deal with that:
>
>    - I can key my stream on the resource id and then define a processing
>    time window of 500ms and when the trigger is ok, then I do my query
>       - by doing so, I will "group" several alerts in a single query, but
>       they will all be linked to the same resource.
>       - so I will do 1 query per resource id (which will be too much in
>       my use case)
>    - I can also do a windowAll on a non keyed stream
>       - by doing so, I will "group" together alerts from different
>       resource ids, but from what I've read in such a case the parallelism will
>       always be one.
>       - so in this case, I will only do 1 query whereas I'd like to have
>       some parallelism
>
> I am thinking that a way to deal with that will be:
>
>    - define the resource id as the key of stream and put a parallelism of
>    4
>    - and then having a way to do a windowAll on this keyed stream
>       - which is that, on a given operator instance, I will "group" on
>       the same window all the keys (ie all the resource ids) managed by this
>       operator instance
>       - with a parallelism of 4, I will do 4 queries in parallel (1 per
>       operator instance, and each query will be for several alerts linked to
>       several resource ids)
>
> But after looking at the documentation, I cannot see this ability (having
> a windowAll on a keyed stream).
>
> Am I missing something?
>
> What will be the best way to deal with such a use case?
>
>
> I've tried for example to review my key and to do something like
> "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a
> time window.
>
> In my example above, the <max nb of queries in parallel> will be 4. And
> all my keys will be 0, 1, 2 or 3.
>
> The issue with this approach is that due to the way the operatorIdx is
> computed based on the key, it does not distribute well my processing:
>
>    - when this partitioning logic from the "KeyGroupRangeAssignment"
>    class is applied
>    -
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> */**      * Assigns the given key to a parallel operator index.      *
>        * @param key the key to assign      * @param maxParallelism the maximum
>       supported parallelism, aka the number of key-groups.      * @param
>       parallelism the current parallelism of the operator      * @return the
>       index of the parallel operator to which the given key should be routed.
>        */     public static int assignKeyToParallelOperator(Object key, int
>       maxParallelism, int parallelism) {         return
>       computeOperatorIndexForKeyGroup(maxParallelism, parallelism,
>       assignToKeyGroup(key, maxParallelism));     }     /**      * Assigns the
>       given key to a key-group index.      *      * @param key the key to assign
>            * @param maxParallelism the maximum supported parallelism, aka the
>       number of key-groups.      * @return the key-group to which the given key
>       is assigned      */     public static int assignToKeyGroup(Object key, int
>       maxParallelism) {         return computeKeyGroupForKeyHash(key.hashCode(),
>       maxParallelism);     }*
>       - key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2
>       over my 4 operators will not have anything to do)
>
>
> So, what will be the best way to deal with that?
>
>
> Thank you in advance for your support.
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>

Re: A "per operator instance" window all ?

Posted by Ken Krugler <kk...@transpac.com>.
Hi Julien,

I'd run into a similar situation, where I need to have a keyed stream, but I want (effectively) one key per task.

It’s possible to generate keys that will get distributed as you need, though it does require making assumptions about how Flink generates hashes/key groups.

And once you start talking about state, then it gets a bit harder, as you need to know the max parallelism, which is used to calculate “key groups”.

Below is a cheesy function I wrote to make an Integer that (if used as the key) will partition the record to the target operator.

I use it in a custom Map function to add a key field.

— Ken

	/**
	 * Return an integer value that will get partitioned to the target <operatorIndex>, given the
	 * workflow's <maxParallelism> (for key groups) and the operator <parallelism>.
	 * 
	 * @param maxParallelism
	 * @param parallelism
	 * @param operatorIndex
	 * @return Integer suitable for use in a record as the key.
	 */
	public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism, int operatorIndex) {
		if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) {
			maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
		}
		
		for (int i = 0; i < maxParallelism * 2; i++) {
			Integer key = new Integer(i);
			int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
			int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup);
			if (index == operatorIndex) {
				return key;
			}
		}
		
		throw new RuntimeException(String.format("Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d", 
				operatorIndex, maxParallelism, parallelism));
	}


> On Feb 19, 2018, at 12:34 AM, Julien <jm...@gmail.com> wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(<my new partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>>     If I am not misunderstand, I think you can key your stream on a `Random.nextInt() % parallesm`, this way  you can "group" together alerts from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui<xi...@gmail.com> <ma...@gmail.com> wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can use ManagedStates <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien <jmassiot77@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> if I have 100 resource IDs with parallelism of 4, then each operator instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to group those 25 keys together in order to do only 1 query to an external system per operator instance:
>>> 
>>> on a given operator instance, I will do 1 query for my 25 keys
>>> so with the 4 operator instances, I will do 4 query in parallel (with about 25 keys per query)
>>> 
>>> I do not know how I can do that.
>>> 
>>> If I define a window on my keyed stream (with for example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then my understanding is that the window is "associated" to the key. So in this case, on a given operator instance, I will have 25 of those windows (one per key), and I will do 25 queries (instead of 1).
>>> 
>>> Do you understand my point ?
>>> Or maybe am I missing something ?
>>> 
>>> I'd like to find a way on operator instance 1 to group all the alerts received on those 25 resource ids and do 1 query for those 25 resource ids.
>>> Same thing for operator instance 2, 3 and 4.
>>> 
>>> 
>>> Thank you,
>>> Regards.
>>> 
>>> 
>>> On 18/02/2018 14:43, Xingcan Cui wrote:
>>>> Hi Julien,
>>>> 
>>>> the cardinality of your keys (e.g., resource ID) will not be restricted to the parallelism. For instance, if you have 100 resource IDs processed by KeyedStream with parallelism 4, each operator instance will handle about 25 keys. 
>>>> 
>>>> Hope that helps.
>>>> 
>>>> Best,
>>>> Xingcan
>>>> 
>>>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassiot77@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I am pretty new to flink and I don't know what will be the best way to deal with the following use case:
>>>>> 
>>>>> as an input, I recieve some alerts from a kafka topic
>>>>> an alert is linked to a network resource (like router-1, router-2, switch-1, switch-2, ...)
>>>>> so an alert has two main information (the alert id and the resource id of the resource on which this alert has been raised)
>>>>> then I need to do a query to an external system in order to enrich the alert with additional information on the resource
>>>>> 
>>>>> (A "natural" candidate for the key on this stream will be the resource id)
>>>>> 
>>>>> The issue I have is that regarding the query to the external system:
>>>>> I do not want to do 1 query per resource id
>>>>> I want to do a small number of queries in parallel (for example 4 queries in parallel every 500ms), each query requesting the external system for several alerts linked to several resource id
>>>>> Currently, I don't know what will be the best way to deal with that:
>>>>> I can key my stream on the resource id and then define a processing time window of 500ms and when the trigger is ok, then I do my query
>>>>> by doing so, I will "group" several alerts in a single query, but they will all be linked to the same resource.
>>>>> so I will do 1 query per resource id (which will be too much in my use case)
>>>>> I can also do a windowAll on a non keyed stream
>>>>> by doing so, I will "group" together alerts from different resource ids, but from what I've read in such a case the parallelism will always be one.
>>>>> so in this case, I will only do 1 query whereas I'd like to have some parallelism
>>>>> I am thinking that a way to deal with that will be:
>>>>> 
>>>>> define the resource id as the key of stream and put a parallelism of 4
>>>>> and then having a way to do a windowAll on this keyed stream
>>>>> which is that, on a given operator instance, I will "group" on the same window all the keys (ie all the resource ids) managed by this operator instance
>>>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator instance, and each query will be for several alerts linked to several resource ids)
>>>>> But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream).
>>>>> 
>>>>> Am I missing something?
>>>>> 
>>>>> What will be the best way to deal with such a use case?
>>>>> 
>>>>> 
>>>>> 
>>>>> I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window.
>>>>> 
>>>>> In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3.
>>>>> 
>>>>> The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:
>>>>> 
>>>>> when this partitioning logic from the "KeyGroupRangeAssignment" class is applied
>>>>>     /**
>>>>>      * Assigns the given key to a parallel operator index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>>>      * @param parallelism the current parallelism of the operator
>>>>>      * @return the index of the parallel operator to which the given key should be routed.
>>>>>      */
>>>>>     public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
>>>>>         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
>>>>>     }
>>>>> 
>>>>>     /**
>>>>>      * Assigns the given key to a key-group index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>>>      * @return the key-group to which the given key is assigned
>>>>>      */
>>>>>     public static int assignToKeyGroup(Object key, int maxParallelism) {
>>>>>         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>>>     }
>>>>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 operators will not have anything to do)
>>>>> 
>>>>> 
>>>>> So, what will be the best way to deal with that?
>>>>> 
>>>>> 
>>>>> 
>>>>> Thank you in advance for your support.
>>>>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: A "per operator instance" window all ?

Posted by Julien <jm...@gmail.com>.
Hi Xingcan, Ken and Till,

OK, thank you. It is clear.

I have various option then:

  * the one suggested by Ken where I can find a way to build a key that
    will be well distributed (1 key per task)
      o it relies on the way Flink partitions the key, but it will do
        the job
  * or I can also go with another way to build my key where I will have
    more keys than the parallelism, so the distribution will be better
      o I will still have few number of requests (much less than the
        number of resource ids as 1 key will be for multiple resource ids)
      o I will potentially do multiple requests on the same task, but it
        may be acceptable, especially if I go with AsyncIO
  * or I can go with the OperatorState and implements my own firing logic
      o I am in a case where the memory-based mechanism should be fine


Thanks again,
Regards.


Julien.


On 20/02/2018 02:48, Xingcan Cui wrote:
> Hi Julien,
>
> you could use the OperatorState 
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-operator-state> to 
> cache the data in a window and the last time your window fired. Then 
> you check the ctx.timerService().currentProcessingTime() in 
> processElement() and once it exceeds the next window boundary, all the 
> cached data should be processed as if the window is fired.
>
> Note that currently, there are only memory-based operator states provided.
>
> Hope this helps,
> Xingcan
>
>> On 19 Feb 2018, at 4:34 PM, Julien <jmassiot77@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hello,
>>
>> I've already tried to key my stream with 
>> "resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
>> So all my keys will be either 0,1, 2 or 3. I can then benefit from a 
>> time window on this keyed stream and do only 4 queries to my external 
>> system.
>> But it is not well distributed with the default partitioner on keyed 
>> stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
>>
>> I think I should explore the customer partitioner, as you suggested 
>> Xingcan.
>> Maybe my last question on this will be: "can you give me more details 
>> on this point "and simulate a window operation by yourself in a 
>> ProcessFunction" ?
>>
>> When I look at the documentation about the custom partitioner, I can 
>> see that the result of partitionCustom is a DataStream.
>> It is not a KeyedStream.
>> So the only window I have will be windowAll (which will bring me back 
>> to a parallelism of 1, no ?).
>>
>> And if I do something like "myStream.partitionCustom(<my new 
>> partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve 
>> my custom partitioner ?
>> When looking at the "KeyedStream" class, it seems that it will go 
>> back to the "KeyGroupStreamPartitioner" and forget my custom 
>> partitioner ?
>>
>> Thanks again for your feedback,
>>
>> Julien.
>>
>>
>> On 19/02/2018 03:45, 周思华 wrote:
>>> Hi Julien,
>>>     If I am not misunderstand, I think you can key your stream on a 
>>> `Random.nextInt() % parallesm`, this way  you can "group" together 
>>> alerts from different and benefit from multi parallems.
>>>
>>>
>>> 发自网易邮箱大师
>>>
>>> On 02/19/2018 09:08,Xingcan Cui<xingcanc@gmail.com 
>>> <ma...@gmail.com>> wrote:
>>> Hi Julien,
>>>
>>> sorry for my misunderstanding before. For now, the window can only 
>>> be defined on a KeyedStream or an ordinary DataStream but with 
>>> parallelism = 1. I’d like to provide three options for your scenario.
>>>
>>> 1. If your external data is static and can be fit into the memory, 
>>> you can use ManagedStates to cache them without considering the 
>>> querying problem.
>>> 2. Or you can use a CustomPartitioner to manually distribute your 
>>> alert data and simulate an window operation by yourself in a 
>>> ProcessFuncton.
>>> 3. You may also choose to use some external systems such as 
>>> in-memory store, which can work as a cache for your queries.
>>>
>>> Best,
>>> Xingcan
>>>
>>>> On 19 Feb 2018, at 5:55 AM, Julien <jmassiot77@gmail.com 
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hi Xingcan,
>>>>
>>>> Thanks for your answer.
>>>> Yes, I understand that point:
>>>> • if I have 100 resource IDs with parallelism of 4, then each 
>>>> operator instance will handle about 25 keys
>>>>
>>>>
>>>> The issue I have is that I want, on a given operator instance, to 
>>>> group those 25 keys together in order to do only 1 query to an 
>>>> external system per operator instance:
>>>>
>>>> • on a given operator instance, I will do 1 query for my 25 keys
>>>> • so with the 4 operator instances, I will do 4 query in parallel 
>>>> (with about 25 keys per query)
>>>>
>>>> I do not know how I can do that.
>>>>
>>>> If I define a window on my keyed stream (with for 
>>>> example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then 
>>>> my understanding is that the window is "associated" to the key. So 
>>>> in this case, on a given operator instance, I will have 25 of those 
>>>> windows (one per key), and I will do 25 queries (instead of 1).
>>>>
>>>> Do you understand my point ?
>>>> Or maybe am I missing something ?
>>>>
>>>> I'd like to find a way on operator instance 1 to group all the 
>>>> alerts received on those 25 resource ids and do 1 query for those 
>>>> 25 resource ids.
>>>> Same thing for operator instance 2, 3 and 4.
>>>>
>>>>
>>>> Thank you,
>>>> Regards.
>>>>
>>>>
>>>> On 18/02/2018 14:43, Xingcan Cui wrote:
>>>>> Hi Julien,
>>>>>
>>>>> the cardinality of your keys (e.g., resource ID) will not be 
>>>>> restricted to the parallelism. For instance, if you have 100 
>>>>> resource IDs processed by KeyedStream with parallelism 4, each 
>>>>> operator instance will handle about 25 keys.
>>>>>
>>>>> Hope that helps.
>>>>>
>>>>> Best,
>>>>> Xingcan
>>>>>
>>>>>> On 18 Feb 2018, at 8:49 PM, Julien <jm...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am pretty new to flink and I don't know what will be the best 
>>>>>> way to deal with the following use case:
>>>>>>
>>>>>> • as an input, I recieve some alerts from a kafka topic
>>>>>> • an alert is linked to a network resource (like router-1, 
>>>>>> router-2, switch-1, switch-2, ...)
>>>>>> • so an alert has two main information (the alert id and the 
>>>>>> resource id of the resource on which this alert has been raised)
>>>>>> • then I need to do a query to an external system in order to 
>>>>>> enrich the alert with additional information on the resource
>>>>>>
>>>>>> (A "natural" candidate for the key on this stream will be the 
>>>>>> resource id)
>>>>>>
>>>>>> The issue I have is that regarding the query to the external system:
>>>>>> • I do not want to do 1 query per resource id
>>>>>> • I want to do a small number of queries in parallel (for example 
>>>>>> 4 queries in parallel every 500ms), each query requesting the 
>>>>>> external system for several alerts linked to several resource id
>>>>>> Currently, I don't know what will be the best way to deal with that:
>>>>>> • I can key my stream on the resource id and then define a 
>>>>>> processing time window of 500ms and when the trigger is ok, then 
>>>>>> I do my query
>>>>>> • by doing so, I will "group" several alerts in a single query, 
>>>>>> but they will all be linked to the same resource.
>>>>>> • so I will do 1 query per resource id (which will be too much in 
>>>>>> my use case)
>>>>>> • I can also do a windowAll on a non keyed stream
>>>>>> • by doing so, I will "group" together alerts from different 
>>>>>> resource ids, but from what I've read in such a case the 
>>>>>> parallelism will always be one.
>>>>>> • so in this case, I will only do 1 query whereas I'd like to 
>>>>>> have some parallelism
>>>>>> I am thinking that a way to deal with that will be:
>>>>>>
>>>>>> • define the resource id as the key of stream and put a 
>>>>>> parallelism of 4
>>>>>> • and then having a way to do a windowAll on this keyed stream
>>>>>> • which is that, on a given operator instance, I will "group" on 
>>>>>> the same window all the keys (ie all the resource ids) managed by 
>>>>>> this operator instance
>>>>>> • with a parallelism of 4, I will do 4 queries in parallel (1 per 
>>>>>> operator instance, and each query will be for several alerts 
>>>>>> linked to several resource ids)
>>>>>> But after looking at the documentation, I cannot see this ability 
>>>>>> (having a windowAll on a keyed stream).
>>>>>>
>>>>>> Am I missing something?
>>>>>>
>>>>>> What will be the best way to deal with such a use case?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I've tried for example to review my key and to do something 
>>>>>> like "resourceId.hahsCode%<max nb of queries in parallel>" and 
>>>>>> then to use a time window.
>>>>>>
>>>>>> In my example above, the <max nb of queries in parallel> will be 
>>>>>> 4. And all my keys will be 0, 1, 2 or 3.
>>>>>>
>>>>>> The issue with this approach is that due to the way the 
>>>>>> operatorIdx is computed based on the key, it does not distribute 
>>>>>> well my processing:
>>>>>>
>>>>>> • when this partitioning logic from the "KeyGroupRangeAssignment" 
>>>>>> class is applied
>>>>>> •     /**
>>>>>>      * Assigns the given key to a parallel operator index.
>>>>>>      *
>>>>>>      * @param key the key to assign
>>>>>>      * @param maxParallelism the maximum supported parallelism, 
>>>>>> aka the number of key-groups.
>>>>>>      * @param parallelism the current parallelism of the operator
>>>>>>      * @return the index of the parallel operator to which the 
>>>>>> given key should be routed.
>>>>>>      */
>>>>>>     public static int assignKeyToParallelOperator(Object key, int 
>>>>>> maxParallelism, int parallelism) {
>>>>>>         return computeOperatorIndexForKeyGroup(maxParallelism, 
>>>>>> parallelism, assignToKeyGroup(key, maxParallelism));
>>>>>>     }
>>>>>>
>>>>>>     /**
>>>>>>      * Assigns the given key to a key-group index.
>>>>>>      *
>>>>>>      * @param key the key to assign
>>>>>>      * @param maxParallelism the maximum supported parallelism, 
>>>>>> aka the number of key-groups.
>>>>>>      * @return the key-group to which the given key is assigned
>>>>>>      */
>>>>>>     public static int assignToKeyGroup(Object key, int 
>>>>>> maxParallelism) {
>>>>>>         return computeKeyGroupForKeyHash(key.hashCode(), 
>>>>>> maxParallelism);
>>>>>>     }
>>>>>> • key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 
>>>>>> over my 4 operators will not have anything to do)
>>>>>>
>>>>>>
>>>>>> So, what will be the best way to deal with that?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thank you in advance for your support.
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Julien.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>>
>


Re: A "per operator instance" window all ?

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Julien,

you could use the OperatorState <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-operator-state> to cache the data in a window and the last time your window fired. Then you check the ctx.timerService().currentProcessingTime() in processElement() and once it exceeds the next window boundary, all the cached data should be processed as if the window is fired.

Note that currently, there are only memory-based operator states provided.

Hope this helps,
Xingcan

> On 19 Feb 2018, at 4:34 PM, Julien <jm...@gmail.com> wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(<my new partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>>     If I am not misunderstand, I think you can key your stream on a `Random.nextInt() % parallesm`, this way  you can "group" together alerts from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui<xi...@gmail.com> wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can use ManagedStates to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien <jm...@gmail.com> wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> 	• if I have 100 resource IDs with parallelism of 4, then each operator instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to group those 25 keys together in order to do only 1 query to an external system per operator instance:
>>> 
>>> 	• on a given operator instance, I will do 1 query for my 25 keys
>>> 	• so with the 4 operator instances, I will do 4 query in parallel (with about 25 keys per query)
>>> 
>>> I do not know how I can do that.
>>> 
>>> If I define a window on my keyed stream (with for example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then my understanding is that the window is "associated" to the key. So in this case, on a given operator instance, I will have 25 of those windows (one per key), and I will do 25 queries (instead of 1).
>>> 
>>> Do you understand my point ?
>>> Or maybe am I missing something ?
>>> 
>>> I'd like to find a way on operator instance 1 to group all the alerts received on those 25 resource ids and do 1 query for those 25 resource ids.
>>> Same thing for operator instance 2, 3 and 4.
>>> 
>>> 
>>> Thank you,
>>> Regards.
>>> 
>>> 
>>> On 18/02/2018 14:43, Xingcan Cui wrote:
>>>> Hi Julien,
>>>> 
>>>> the cardinality of your keys (e.g., resource ID) will not be restricted to the parallelism. For instance, if you have 100 resource IDs processed by KeyedStream with parallelism 4, each operator instance will handle about 25 keys. 
>>>> 
>>>> Hope that helps.
>>>> 
>>>> Best,
>>>> Xingcan
>>>> 
>>>>> On 18 Feb 2018, at 8:49 PM, Julien <jm...@gmail.com> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I am pretty new to flink and I don't know what will be the best way to deal with the following use case:
>>>>> 
>>>>> 	• as an input, I recieve some alerts from a kafka topic
>>>>> 		• an alert is linked to a network resource (like router-1, router-2, switch-1, switch-2, ...)
>>>>> 		• so an alert has two main information (the alert id and the resource id of the resource on which this alert has been raised)
>>>>> 	• then I need to do a query to an external system in order to enrich the alert with additional information on the resource
>>>>> 
>>>>> (A "natural" candidate for the key on this stream will be the resource id)
>>>>> 
>>>>> The issue I have is that regarding the query to the external system:
>>>>> 	• I do not want to do 1 query per resource id
>>>>> 	• I want to do a small number of queries in parallel (for example 4 queries in parallel every 500ms), each query requesting the external system for several alerts linked to several resource id
>>>>> Currently, I don't know what will be the best way to deal with that:
>>>>> 	• I can key my stream on the resource id and then define a processing time window of 500ms and when the trigger is ok, then I do my query
>>>>> 		• by doing so, I will "group" several alerts in a single query, but they will all be linked to the same resource.
>>>>> 		• so I will do 1 query per resource id (which will be too much in my use case)
>>>>> 	• I can also do a windowAll on a non keyed stream
>>>>> 		• by doing so, I will "group" together alerts from different resource ids, but from what I've read in such a case the parallelism will always be one.
>>>>> 		• so in this case, I will only do 1 query whereas I'd like to have some parallelism
>>>>> I am thinking that a way to deal with that will be:
>>>>> 
>>>>> 	• define the resource id as the key of stream and put a parallelism of 4
>>>>> 	• and then having a way to do a windowAll on this keyed stream
>>>>> 		• which is that, on a given operator instance, I will "group" on the same window all the keys (ie all the resource ids) managed by this operator instance
>>>>> 		• with a parallelism of 4, I will do 4 queries in parallel (1 per operator instance, and each query will be for several alerts linked to several resource ids)
>>>>> But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream).
>>>>> 
>>>>> Am I missing something?
>>>>> 
>>>>> What will be the best way to deal with such a use case?
>>>>> 
>>>>> 
>>>>> 
>>>>> I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window.
>>>>> 
>>>>> In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3.
>>>>> 
>>>>> The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:
>>>>> 
>>>>> 	• when this partitioning logic from the "KeyGroupRangeAssignment" class is applied
>>>>> 		•     /**
>>>>>      * Assigns the given key to a parallel operator index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>>>      * @param parallelism the current parallelism of the operator
>>>>>      * @return the index of the parallel operator to which the given key should be routed.
>>>>>      */
>>>>>     public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
>>>>>         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
>>>>>     }
>>>>> 
>>>>>     /**
>>>>>      * Assigns the given key to a key-group index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>>>      * @return the key-group to which the given key is assigned
>>>>>      */
>>>>>     public static int assignToKeyGroup(Object key, int maxParallelism) {
>>>>>         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>>>     }
>>>>> 		• key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 operators will not have anything to do)
>>>>> 
>>>>> 
>>>>> So, what will be the best way to deal with that?
>>>>> 
>>>>> 
>>>>> 
>>>>> Thank you in advance for your support.
>>>>> 
>>>>> Regards.
>>>>> 
>>>>> 
>>>>> 
>>>>> Julien.
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 
> 


Re: A "per operator instance" window all ?

Posted by Julien <jm...@gmail.com>.
Hello,

I've already tried to key my stream with 
"resourceId.hashCode%parallelism" (with parallelism of 4 in my example).
So all my keys will be either 0,1, 2 or 3. I can then benefit from a 
time window on this keyed stream and do only 4 queries to my external 
system.
But it is not well distributed with the default partitioner on keyed 
stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).

I think I should explore the customer partitioner, as you suggested Xingcan.
Maybe my last question on this will be: "can you give me more details on 
this point "and simulate a window operation by yourself in a 
ProcessFunction" ?

When I look at the documentation about the custom partitioner, I can see 
that the result of partitionCustom is a DataStream.
It is not a KeyedStream.
So the only window I have will be windowAll (which will bring me back to 
a parallelism of 1, no ?).

And if I do something like "myStream.partitionCustom(<my new 
partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my 
custom partitioner ?
When looking at the "KeyedStream" class, it seems that it will go back 
to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?

Thanks again for your feedback,

Julien.


On 19/02/2018 03:45, 周思华 wrote:
> Hi Julien,
>     If I am not misunderstand, I think you can key your stream on a 
> `Random.nextInt() % parallesm`, this way  you can "group" together 
> alerts from different and benefit from multi parallems.
>
>
> 发自网易邮箱大师
>
> On 02/19/2018 09:08,Xingcan Cui<xi...@gmail.com> 
> <ma...@gmail.com> wrote:
>
>     Hi Julien,
>
>     sorry for my misunderstanding before. For now, the window can only
>     be defined on a KeyedStream or an ordinary DataStream but with
>     parallelism = 1. I’d like to provide three options for your scenario.
>
>     1. If your external data is static and can be fit into the memory,
>     you can use ManagedStates
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to
>     cache them without considering the querying problem.
>     2. Or you can use a CustomPartitioner
>     <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to
>     manually distribute your alert data and simulate an window
>     operation by yourself in a ProcessFuncton.
>     3. You may also choose to use some external systems such as
>     in-memory store, which can work as a cache for your queries.
>
>     Best,
>     Xingcan
>
>>     On 19 Feb 2018, at 5:55 AM, Julien <jmassiot77@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>     Hi Xingcan,
>>
>>     Thanks for your answer.
>>     Yes, I understand that point:
>>
>>       * if I have 100 resource IDs with parallelism of 4, then each
>>         operator instance will handle about 25 keys
>>
>>
>>     The issue I have is that I want, on a given operator instance, to
>>     group those 25 keys together in order to do only 1 query to an
>>     external system per operator instance:
>>
>>       * on a given operator instance, I will do 1 query for my 25 keys
>>       * so with the 4 operator instances, I will do 4 query in
>>         parallel (with about 25 keys per query)
>>
>>
>>     I do not know how I can do that.
>>
>>     If I define a window on my keyed stream (with for example
>>     /stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>>     /then my understanding is that the window is "associated" to the
>>     key. So in this case, on a given operator instance, I will have
>>     25 of those windows (one per key), and I will do 25 queries
>>     (instead of 1).
>>
>>     Do you understand my point ?
>>     Or maybe am I missing something ?
>>
>>     I'd like to find a way on operator instance 1 to group all the
>>     alerts received on those 25 resource ids and do 1 query for those
>>     25 resource ids.
>>     Same thing for operator instance 2, 3 and 4.
>>
>>
>>     Thank you,
>>     Regards.
>>
>>
>>     On 18/02/2018 14:43, Xingcan Cui wrote:
>>>     Hi Julien,
>>>
>>>     the cardinality of your keys (e.g., resource ID) will not be
>>>     restricted to the parallelism. For instance, if you have 100
>>>     resource IDs processed by KeyedStream with parallelism 4, each
>>>     operator instance will handle about 25 keys.
>>>
>>>     Hope that helps.
>>>
>>>     Best,
>>>     Xingcan
>>>
>>>>     On 18 Feb 2018, at 8:49 PM, Julien <jmassiot77@gmail.com
>>>>     <ma...@gmail.com>> wrote:
>>>>
>>>>     Hi,
>>>>
>>>>     I am pretty new to flink and I don't know what will be the best
>>>>     way to deal with the following use case:
>>>>
>>>>       * as an input, I recieve some alerts from a kafka topic
>>>>           o an alert is linked to a network resource (like
>>>>             router-1, router-2, switch-1, switch-2, ...)
>>>>           o so an alert has two main information (the alert id and
>>>>             the resource id of the resource on which this alert has
>>>>             been raised)
>>>>       * then I need to do a query to an external system in order to
>>>>         enrich the alert with additional information on the resource
>>>>
>>>>
>>>>     (A "natural" candidate for the key on this stream will be the
>>>>     resource id)
>>>>
>>>>     The issue I have is that regarding the query to the external
>>>>     system:
>>>>
>>>>       * I do not want to do 1 query per resource id
>>>>       * I want to do a small number of queries in parallel (for
>>>>         example 4 queries in parallel every 500ms), each query
>>>>         requesting the external system for several alerts linked to
>>>>         several resource id
>>>>
>>>>     Currently, I don't know what will be the best way to deal with
>>>>     that:
>>>>
>>>>       * I can key my stream on the resource id and then define a
>>>>         processing time window of 500ms and when the trigger is ok,
>>>>         then I do my query
>>>>           o by doing so, I will "group" several alerts in a single
>>>>             query, but they will all be linked to the same resource.
>>>>           o so I will do 1 query per resource id (which will be too
>>>>             much in my use case)
>>>>       * I can also do a windowAll on a non keyed stream
>>>>           o by doing so, I will "group" together alerts from
>>>>             different resource ids, but from what I've read in such
>>>>             a case the parallelism will always be one.
>>>>           o so in this case, I will only do 1 query whereas I'd
>>>>             like to have some parallelism
>>>>
>>>>     I am thinking that a way to deal with that will be:
>>>>
>>>>       * define the resource id as the key of stream and put a
>>>>         parallelism of 4
>>>>       * and then having a way to do a windowAll on this keyed stream
>>>>           o which is that, on a given operator instance, I will
>>>>             "group" on the same window all the keys (ie all the
>>>>             resource ids) managed by this operator instance
>>>>           o with a parallelism of 4, I will do 4 queries in
>>>>             parallel (1 per operator instance, and each query will
>>>>             be for several alerts linked to several resource ids)
>>>>
>>>>     But after looking at the documentation, I cannot see this
>>>>     ability (having a windowAll on a keyed stream).
>>>>
>>>>     Am I missing something?
>>>>
>>>>     What will be the best way to deal with such a use case?
>>>>
>>>>
>>>>     I've tried for example to review my key and to do something
>>>>     like "resourceId.hahsCode%<max nb of queries in parallel>" and
>>>>     then to use a time window.
>>>>
>>>>     In my example above, the <max nb of queries in parallel> will
>>>>     be 4. And all my keys will be 0, 1, 2 or 3.
>>>>
>>>>     The issue with this approach is that due to the way the
>>>>     operatorIdx is computed based on the key, it does not
>>>>     distribute well my processing:
>>>>
>>>>       * when this partitioning logic from the
>>>>         "KeyGroupRangeAssignment" class is applied
>>>>           o //**
>>>>                  * Assigns the given key to a parallel operator index.
>>>>                  *
>>>>                  * @param key the key to assign
>>>>                  * @param maxParallelism the maximum supported
>>>>             parallelism, aka the number of key-groups.
>>>>                  * @param parallelism the current parallelism of
>>>>             the operator
>>>>                  * @return the index of the parallel operator to
>>>>             which the given key should be routed.
>>>>                  */
>>>>                 public static int
>>>>             assignKeyToParallelOperator(Object key, int
>>>>             maxParallelism, int parallelism) {
>>>>                     return
>>>>             computeOperatorIndexForKeyGroup(maxParallelism,
>>>>             parallelism, assignToKeyGroup(key, maxParallelism));
>>>>                 }
>>>>
>>>>                 /**
>>>>                  * Assigns the given key to a key-group index.
>>>>                  *
>>>>                  * @param key the key to assign
>>>>                  * @param maxParallelism the maximum supported
>>>>             parallelism, aka the number of key-groups.
>>>>                  * @return the key-group to which the given key is
>>>>             assigned
>>>>                  */
>>>>                 public static int assignToKeyGroup(Object key, int
>>>>             maxParallelism) {
>>>>                     return
>>>>             computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>>                 }/
>>>>           o key 0, 1, 2 and 3 are only assigned to operator 2 and 3
>>>>             (so 2 over my 4 operators will not have anything to do)
>>>>
>>>>
>>>>     So, what will be the best way to deal with that?
>>>>
>>>>
>>>>     Thank you in advance for your support.
>>>>
>>>>     Regards.
>>>>
>>>>
>>>>     Julien.
>>>>
>>>>
>>>
>>
>


Re: A "per operator instance" window all ?

Posted by 周思华 <su...@163.com>.
Hi Julien,
    If I am not misunderstand, I think you can key your stream on a `Random.nextInt() % parallesm`, this way  you can "group" together alerts from different and benefit from multi parallems.




发自网易邮箱大师


On 02/19/2018 09:08,Xingcan Cui<xi...@gmail.com> wrote:
Hi Julien,


sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario.


1. If your external data is static and can be fit into the memory, you can use ManagedStates to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries.


Best,
Xingcan



On 19 Feb 2018, at 5:55 AM, Julien <jm...@gmail.com> wrote:


Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:

if I have 100 resource IDs with parallelism of 4, then each operator instance will handle about 25 keys




The issue I have is that I want, on a given operator instance, to group those 25 keys together in order to do only 1 query to an external system per operator instance:

on a given operator instance, I will do 1 query for my 25 keys
so with the 4 operator instances, I will do 4 query in parallel (with about 25 keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then my understanding is that the window is "associated" to the key. So in this case, on a given operator instance, I will have 25 of those windows (one per key), and I will do 25 queries (instead of 1).

Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the alerts received on those 25 resource ids and do 1 query for those 25 resource ids.
Same thing for operator instance 2, 3 and 4.


Thank you,
Regards.


On 18/02/2018 14:43, Xingcan Cui wrote:

Hi Julien,


the cardinality of your keys (e.g., resource ID) will not be restricted to the parallelism. For instance, if you have 100 resource IDs processed by KeyedStream with parallelism 4, each operator instance will handle about 25 keys. 


Hope that helps.


Best,
Xingcan


On 18 Feb 2018, at 8:49 PM, Julien <jm...@gmail.com> wrote:



Hi,

I am pretty new to flink and I don't know what will be the best way to deal with the following use case:

as an input, I recieve some alerts from a kafka topic
an alert is linked to a network resource (like router-1, router-2, switch-1, switch-2, ...)
so an alert has two main information (the alert id and the resource id of the resource on which this alert has been raised)
then I need to do a query to an external system in order to enrich the alert with additional information on the resource

(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

I do not want to do 1 query per resource id
I want to do a small number of queries in parallel (for example 4 queries in parallel every 500ms), each query requesting the external system for several alerts linked to several resource id
Currently, I don't know what will be the best way to deal with that:

I can key my stream on the resource id and then define a processing time window of 500ms and when the trigger is ok, then I do my query
by doing so, I will "group" several alerts in a single query, but they will all be linked to the same resource.
so I will do 1 query per resource id (which will be too much in my use case)
I can also do a windowAll on a non keyed stream
by doing so, I will "group" together alerts from different resource ids, but from what I've read in such a case the parallelism will always be one.
so in this case, I will only do 1 query whereas I'd like to have some parallelism

I am thinking that a way to deal with that will be:

define the resource id as the key of stream and put a parallelism of 4
and then having a way to do a windowAll on this keyed stream
which is that, on a given operator instance, I will "group" on the same window all the keys (ie all the resource ids) managed by this operator instance
with a parallelism of 4, I will do 4 queries in parallel (1 per operator instance, and each query will be for several alerts linked to several resource ids)

But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?




I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window.

In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3.

The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:

when this partitioning logic from the "KeyGroupRangeAssignment" class is applied

    /**
     * Assigns the given key to a parallel operator index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @param parallelism the current parallelism of the operator
     * @return the index of the parallel operator to which the given key should be routed.
     */
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    /**
     * Assigns the given key to a key-group index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 operators will not have anything to do)




So, what will be the best way to deal with that?




Thank you in advance for your support.

Regards.




Julien.












Re: A "per operator instance" window all ?

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Julien,

sorry for my misunderstanding before. For now, the window can only be defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like to provide three options for your scenario.

1. If your external data is static and can be fit into the memory, you can use ManagedStates <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state> to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning> to manually distribute your alert data and simulate an window operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, which can work as a cache for your queries.

Best,
Xingcan

> On 19 Feb 2018, at 5:55 AM, Julien <jm...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your answer.
> Yes, I understand that point:
> if I have 100 resource IDs with parallelism of 4, then each operator instance will handle about 25 keys
> 
> The issue I have is that I want, on a given operator instance, to group those 25 keys together in order to do only 1 query to an external system per operator instance:
> 
> on a given operator instance, I will do 1 query for my 25 keys
> so with the 4 operator instances, I will do 4 query in parallel (with about 25 keys per query)
> 
> I do not know how I can do that.
> 
> If I define a window on my keyed stream (with for example stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))), then my understanding is that the window is "associated" to the key. So in this case, on a given operator instance, I will have 25 of those windows (one per key), and I will do 25 queries (instead of 1).
> 
> Do you understand my point ?
> Or maybe am I missing something ?
> 
> I'd like to find a way on operator instance 1 to group all the alerts received on those 25 resource ids and do 1 query for those 25 resource ids.
> Same thing for operator instance 2, 3 and 4.
> 
> 
> Thank you,
> Regards.
> 
> 
> On 18/02/2018 14:43, Xingcan Cui wrote:
>> Hi Julien,
>> 
>> the cardinality of your keys (e.g., resource ID) will not be restricted to the parallelism. For instance, if you have 100 resource IDs processed by KeyedStream with parallelism 4, each operator instance will handle about 25 keys. 
>> 
>> Hope that helps.
>> 
>> Best,
>> Xingcan
>> 
>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassiot77@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I am pretty new to flink and I don't know what will be the best way to deal with the following use case:
>>> 
>>> as an input, I recieve some alerts from a kafka topic
>>> an alert is linked to a network resource (like router-1, router-2, switch-1, switch-2, ...)
>>> so an alert has two main information (the alert id and the resource id of the resource on which this alert has been raised)
>>> then I need to do a query to an external system in order to enrich the alert with additional information on the resource
>>> 
>>> (A "natural" candidate for the key on this stream will be the resource id)
>>> 
>>> The issue I have is that regarding the query to the external system:
>>> I do not want to do 1 query per resource id
>>> I want to do a small number of queries in parallel (for example 4 queries in parallel every 500ms), each query requesting the external system for several alerts linked to several resource id
>>> Currently, I don't know what will be the best way to deal with that:
>>> I can key my stream on the resource id and then define a processing time window of 500ms and when the trigger is ok, then I do my query
>>> by doing so, I will "group" several alerts in a single query, but they will all be linked to the same resource.
>>> so I will do 1 query per resource id (which will be too much in my use case)
>>> I can also do a windowAll on a non keyed stream
>>> by doing so, I will "group" together alerts from different resource ids, but from what I've read in such a case the parallelism will always be one.
>>> so in this case, I will only do 1 query whereas I'd like to have some parallelism
>>> I am thinking that a way to deal with that will be:
>>> 
>>> define the resource id as the key of stream and put a parallelism of 4
>>> and then having a way to do a windowAll on this keyed stream
>>> which is that, on a given operator instance, I will "group" on the same window all the keys (ie all the resource ids) managed by this operator instance
>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator instance, and each query will be for several alerts linked to several resource ids)
>>> But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream).
>>> 
>>> Am I missing something?
>>> 
>>> What will be the best way to deal with such a use case?
>>> 
>>> 
>>> I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window.
>>> 
>>> In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3.
>>> 
>>> The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:
>>> 
>>> when this partitioning logic from the "KeyGroupRangeAssignment" class is applied
>>>     /**
>>>      * Assigns the given key to a parallel operator index.
>>>      *
>>>      * @param key the key to assign
>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>      * @param parallelism the current parallelism of the operator
>>>      * @return the index of the parallel operator to which the given key should be routed.
>>>      */
>>>     public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
>>>         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
>>>     }
>>> 
>>>     /**
>>>      * Assigns the given key to a key-group index.
>>>      *
>>>      * @param key the key to assign
>>>      * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
>>>      * @return the key-group to which the given key is assigned
>>>      */
>>>     public static int assignToKeyGroup(Object key, int maxParallelism) {
>>>         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>     }
>>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 operators will not have anything to do)
>>> 
>>> So, what will be the best way to deal with that?
>>> 
>>> 
>>> 
>>> Thank you in advance for your support.
>>> 
>>> Regards.
>>> 
>>> 
>>> 
>>> Julien.
>>> 
>>> 
>> 
>