You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by M Singh <ma...@yahoo.com> on 2022/01/22 13:32:28 UTC

Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Hi Folks:
I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.
Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?
Thanks








Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by M Singh <ma...@yahoo.com>.
 
Thanks Edward for your explanation.  I missed the part about the aggregationKey being added the processor.      On Tuesday, January 25, 2022, 02:12:41 PM EST, Colletta, Edward <ed...@fmr.com> wrote:  
 
 <!--#yiv9703936760 _filtered {} _filtered {} _filtered {} _filtered {} _filtered {}#yiv9703936760 #yiv9703936760 p.yiv9703936760MsoNormal, #yiv9703936760 li.yiv9703936760MsoNormal, #yiv9703936760 div.yiv9703936760MsoNormal {margin:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv9703936760 a:link, #yiv9703936760 span.yiv9703936760MsoHyperlink {color:blue;text-decoration:underline;}#yiv9703936760 p.yiv9703936760msolistparagraph, #yiv9703936760 li.yiv9703936760msolistparagraph, #yiv9703936760 div.yiv9703936760msolistparagraph {margin-right:0in;margin-left:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv9703936760 p.yiv9703936760msonormal, #yiv9703936760 li.yiv9703936760msonormal, #yiv9703936760 div.yiv9703936760msonormal {margin-right:0in;margin-left:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv9703936760 span.yiv9703936760EmailStyle22 {font-family:"Calibri", sans-serif;color:windowtext;}#yiv9703936760 .yiv9703936760MsoChpDefault {font-size:10.0pt;} _filtered {}#yiv9703936760 div.yiv9703936760WordSection1 {}#yiv9703936760 _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {}#yiv9703936760 ol {margin-bottom:0in;}#yiv9703936760 ul {margin-bottom:0in;}-->
Here is some sample data which may help visualize how the aggregation is changed dynamically. 
 
We start by aggregating by session and session+account by placing values into aggregationKey based on the fields in groupByFIelds.  
 
Then we delete the session+account aggregation, and add an aggregation by account.
 
The way we are changing the aggregation dynamically is by using an indirect field to key by called aggregationKey which we add based on current broadcast state.
 
Note, this is for streaming jobs and aggregations starts fresh from the point at which a new groupByType is received.
 
  
 
aggregateInstruction
 
{groupByFields:[session],groupByType:bySession,action:add}
 
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}
 
  
 
dataToAggregate
 
{session:1,account:1,value:100}
 
{session:2,account:1,value:200}
 
{session:1,account:2,value:400}
 
{session:1,account:1,value:800}
 
  
 
  
 
streamReadyToAggregate
 
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
 
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
 
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
 
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
 
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
 
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
 
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
 
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}
 
  
 
aggregateInstruction
 
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
 
{groupByFields:[account],groupByType:byAccount,action:add}
 
  
 
dataToAggregate
 
{session:3,account:1,value:1600}
 
{session:3,account:2,value:3200}
 
  
 
streamReadyToAggregate
 
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
 
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
 
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
 
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}
 
  
 
  
 
  
 
From: Colletta, Edward <Ed...@FMR.COM> 
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh <ma...@yahoo.com>; Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application
 
  
 
You don’t have to add keyBy’s at runtime.  You change what is in the value ofaggregationKey at run time
 
Some records may appear several times with different fields extracted to aggregationKey.  They dynamic building of the grouping is really done by the flatMap
 
  
 
  
 
From: M Singh <ma...@yahoo.com>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>; Colletta, Edward <Ed...@FMR.COM>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application
 
  
 
NOTICE: This email is from an external sender -do not click on links or attachments unless you recognize the sender and know the content is safe.
 
  
 
Thanks Edward for your response.
 
  
 
The problem I have is that I am not sure how to add or remove keyBy's at run time since the flink topology is based on that (as Caizhi mentioned).
 
  
 
I believe we can change the single keyBy in your example, but not add/remove them.  
 
  
 
Please let me know if I have missed anything.
 
  
 
  
 
  
 
On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward <ed...@fmr.com> wrote:
 
  
 
  
 
 
 
A general pattern for dynamically adding new aggregations could be something like this
 
 
 
        BroadcastStream<AggregationInstructions> broadcastStream = aggregationInstructions
 
            .broadcast(broadcastStateDescriptor);
 
 
 
        DataStream<DataToAggregateEnrichedWithAggregationInstructions> streamReadyToAggregate = dataToAggregate
 
            .connect(broadcastStream)
 
            .process(new JoinFunction())
 
            .flatMap(new AddAggregationKeyAndDescriptor)
 
            .keyBy('aggregationKey')
 
 
 
Where
 
·       aggregationInstructions is a stream describing which fields to aggregate by.  It might contain a List<String> of the field names and another field which can be used to describe what the aggregation is doing.   Example  groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = ‘bySession’, action = ‘Add’ or  ‘Delete’
 
·       JoinFunction is a KeyedBroadcastProcessFunction which adds the groupByFields and groupingType to each message in the dataToAggregate stream and possibly deletes groupings from state.
 
·       AddAggregationKeyAndDescriptor is a FlatMapFunction which adds aggregationKey to the stream based on the value of groupByFields
 
 
 
The flatMap means one message may be emitted several times with different values of aggregationKey so it may belong to multiple aggregations.
 
 
 
 
 
 
 
From: M Singh <ma...@yahoo.com>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application
 
 
 
NOTICE: This email is from an external sender -do not click on links or attachments unless you recognize the sender and know the content is safe.
 
 
 
Hi Caizhi:
 
 
 
Thanks for your reply.
 
 
 
I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.
 
 
 
Thanks
 
 
 
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com> wrote:
 
 
 
 
 
Hi!
 
 
 
Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.
 
 
 
By the way, why do you need this functionality? Could you elaborate more on your use case?
 
 
 
M Singh <ma...@yahoo.com>于2022年1月22日周六 21:32写道:
 

Hi Folks:
 
 
 
I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.
 
 
 
Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?
 
 
 
Thanks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  

RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
Here is some sample data which may help visualize how the aggregation is changed dynamically.
We start by aggregating by session and session+account by placing values into aggregationKey based on the fields in groupByFIelds.
Then we delete the session+account aggregation, and add an aggregation by account.
The way we are changing the aggregation dynamically is by using an indirect field to key by called aggregationKey which we add based on current broadcast state.
Note, this is for streaming jobs and aggregations starts fresh from the point at which a new groupByType is received.

aggregateInstruction
{groupByFields:[session],groupByType:bySession,action:add}
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:add}

dataToAggregate
{session:1,account:1,value:100}
{session:2,account:1,value:200}
{session:1,account:2,value:400}
{session:1,account:1,value:800}


streamReadyToAggregate
{session:1,account:1,value:100,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:1,value:100,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}
{session:2,account:1,value:200,groupByFields:[session],groupByType:bySession,aggregationKey:'2'}}
{session:2,account:1,value:200,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'2|1'}}
{session:1,account:2,value:400,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}
{session:1,account:2,value:400,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|2'}}
{session:1,account:1,value:800,groupByFields:[session],groupByType:bySession,aggregationKey:'1'}}
{session:1,account:1,value:800,groupByFields:[session,account],groupByType:bySessionAndAccount,aggregationKey:'1|1'}}}

aggregateInstruction
{groupByFields:[session,account],groupByType:bySessionAndAccount,action:delete}
{groupByFields:[account],groupByType:byAccount,action:add}

dataToAggregate
{session:3,account:1,value:1600}
{session:3,account:2,value:3200}

streamReadyToAggregate
{session:3,account:1,value:1600,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:1600,groupByFields:[account],groupByType:byAccount,aggregationKey:'1'}
{session:3,account:1,value:3200,groupByFields:[session],groupByType:bySession,aggregationKey:'3'}
{session:3,account:1,value:3200,groupByFields:[account],groupByType:byAccount,aggregationKey:'2'}



From: Colletta, Edward <Ed...@FMR.COM>
Sent: Tuesday, January 25, 2022 1:29 PM
To: M Singh <ma...@yahoo.com>; Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>
Subject: RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

You don’t have to add keyBy’s at runtime.  You change what is in the value of aggregationKey at run time
Some records may appear several times with different fields extracted to aggregationKey.  They dynamic building of the grouping is really done by the flatMap


From: M Singh <ma...@yahoo.com>>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng <ts...@gmail.com>>; User-Flink <us...@flink.apache.org>>; Colletta, Edward <Ed...@FMR.COM>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward <ed...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something like this



        BroadcastStream<AggregationInstructions> broadcastStream = aggregationInstructions

            .broadcast(broadcastStateDescriptor);



        DataStream<DataToAggregateEnrichedWithAggregationInstructions> streamReadyToAggregate = dataToAggregate

            .connect(broadcastStream)

            .process(new JoinFunction())

            .flatMap(new AddAggregationKeyAndDescriptor)

            .keyBy('aggregationKey')



Where

·        aggregationInstructions is a stream describing which fields to aggregate by.  It might contain a List<String> of the field names and another field which can be used to describe what the aggregation is doing.   Example  groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = ‘bySession’, action = ‘Add’ or  ‘Delete’

·        JoinFunction is a KeyedBroadcastProcessFunction which adds the groupByFields and groupingType to each message in the dataToAggregate stream and possibly deletes groupings from state.

·        AddAggregationKeyAndDescriptor is a FlatMapFunction which adds aggregationKey to the stream based on the value of groupByFields



The flatMap means one message may be emitted several times with different values of aggregationKey so it may belong to multiple aggregations.







From: M Singh <ma...@yahoo.com>>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <ts...@gmail.com>>; User-Flink <us...@flink.apache.org>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application



NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe.



Hi Caizhi:



Thanks for your reply.



I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.



Thanks



On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com>> wrote:





Hi!



Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.



By the way, why do you need this functionality? Could you elaborate more on your use case?



M Singh <ma...@yahoo.com>> 于2022年1月22日周六 21:32写道:

Hi Folks:



I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.



Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?



Thanks

















RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
You don’t have to add keyBy’s at runtime.  You change what is in the value of aggregationKey at run time
Some records may appear several times with different fields extracted to aggregationKey.  They dynamic building of the grouping is really done by the flatMap


From: M Singh <ma...@yahoo.com>
Sent: Tuesday, January 25, 2022 1:12 PM
To: Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>; Colletta, Edward <Ed...@FMR.COM>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe.

Thanks Edward for your response.

The problem I have is that I am not sure how to add or remove keyBy's at run time since the flink topology is based on that (as Caizhi mentioned).

I believe we can change the single keyBy in your example, but not add/remove them.

Please let me know if I have missed anything.



On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward <ed...@fmr.com>> wrote:





A general pattern for dynamically adding new aggregations could be something like this



        BroadcastStream<AggregationInstructions> broadcastStream = aggregationInstructions

            .broadcast(broadcastStateDescriptor);



        DataStream<DataToAggregateEnrichedWithAggregationInstructions> streamReadyToAggregate = dataToAggregate

            .connect(broadcastStream)

            .process(new JoinFunction())

            .flatMap(new AddAggregationKeyAndDescriptor)

            .keyBy('aggregationKey')



Where

·        aggregationInstructions is a stream describing which fields to aggregate by.  It might contain a List<String> of the field names and another field which can be used to describe what the aggregation is doing.   Example  groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = ‘bySession’, action = ‘Add’ or  ‘Delete’

·        JoinFunction is a KeyedBroadcastProcessFunction which adds the groupByFields and groupingType to each message in the dataToAggregate stream and possibly deletes groupings from state.

·        AddAggregationKeyAndDescriptor is a FlatMapFunction which adds aggregationKey to the stream based on the value of groupByFields



The flatMap means one message may be emitted several times with different values of aggregationKey so it may belong to multiple aggregations.







From: M Singh <ma...@yahoo.com>>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <ts...@gmail.com>>; User-Flink <us...@flink.apache.org>>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application



NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe.



Hi Caizhi:



Thanks for your reply.



I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.



Thanks



On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com>> wrote:





Hi!



Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.



By the way, why do you need this functionality? Could you elaborate more on your use case?



M Singh <ma...@yahoo.com>> 于2022年1月22日周六 21:32写道:

Hi Folks:



I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.



Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?



Thanks

















Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by M Singh <ma...@yahoo.com>.
 Thanks Edward for your response.
The problem I have is that I am not sure how to add or remove keyBy's at run time since the flink topology is based on that (as Caizhi mentioned).
I believe we can change the single keyBy in your example, but not add/remove them.  
Please let me know if I have missed anything.


    On Tuesday, January 25, 2022, 12:53:46 PM EST, Colletta, Edward <ed...@fmr.com> wrote:  
 
 <!--#yiv1128348432 _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {}#yiv1128348432 #yiv1128348432 p.yiv1128348432MsoNormal, #yiv1128348432 li.yiv1128348432MsoNormal, #yiv1128348432 div.yiv1128348432MsoNormal {margin:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv1128348432 a:link, #yiv1128348432 span.yiv1128348432MsoHyperlink {color:blue;text-decoration:underline;}#yiv1128348432 p.yiv1128348432MsoListParagraph, #yiv1128348432 li.yiv1128348432MsoListParagraph, #yiv1128348432 div.yiv1128348432MsoListParagraph {margin-top:0in;margin-right:0in;margin-bottom:0in;margin-left:.5in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv1128348432 span.yiv1128348432EmailStyle19 {font-family:"Calibri", sans-serif;color:windowtext;}#yiv1128348432 .yiv1128348432MsoChpDefault {font-size:10.0pt;} _filtered {}#yiv1128348432 div.yiv1128348432WordSection1 {}#yiv1128348432 _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {} _filtered {}#yiv1128348432 ol {margin-bottom:0in;}#yiv1128348432 ul {margin-bottom:0in;}-->
  
 
A general pattern for dynamically adding new aggregations could be something like this
 
  
 
        BroadcastStream<AggregationInstructions> broadcastStream = aggregationInstructions
 
            .broadcast(broadcastStateDescriptor);
 
  
 
        DataStream<DataToAggregateEnrichedWithAggregationInstructions> streamReadyToAggregate = dataToAggregate
 
            .connect(broadcastStream)
 
            .process(new JoinFunction())
 
            .flatMap(new AddAggregationKeyAndDescriptor)
 
            .keyBy('aggregationKey')
 
  
 
Where
    
   - aggregationInstructions is a stream describing which fields to aggregate by.  It might contain a List<String> of the field names and another field which can be used to describe what the aggregation is doing.   Example  groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = ‘bySession’, action = ‘Add’ or  ‘Delete’
   - JoinFunction is a KeyedBroadcastProcessFunction which adds the groupByFields and groupingType to each message in the dataToAggregate stream and possibly deletes groupings from state.
   - AddAggregationKeyAndDescriptor is a FlatMapFunction which adds aggregationKey to the stream based on the value of groupByFields
 
  
 
The flatMap means one message may be emitted several times with different values of aggregationKey so it may belong to multiple aggregations.
 
  
 
  
 
  
 
From: M Singh <ma...@yahoo.com> 
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application
 
  
 
NOTICE: This email is from an external sender -do not click on links or attachments unless you recognize the sender and know the content is safe.
 
  
 
Hi Caizhi:
 
  
 
Thanks for your reply.
 
  
 
I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.
 
  
 
Thanks
 
  
 
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com> wrote:
 
  
 
  
 
Hi!
 
  
 
Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.
 
  
 
By the way, why do you need this functionality? Could you elaborate more on your use case?
 
  
 
M Singh <ma...@yahoo.com>于2022年1月22日周六 21:32写道:
 

Hi Folks:
 
  
 
I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.
 
  
 
Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?
 
  
 
Thanks
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  
 
  

RE: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by "Colletta, Edward" <Ed...@FMR.COM>.
A general pattern for dynamically adding new aggregations could be something like this

        BroadcastStream<AggregationInstructions> broadcastStream = aggregationInstructions
            .broadcast(broadcastStateDescriptor);

        DataStream<DataToAggregateEnrichedWithAggregationInstructions> streamReadyToAggregate = dataToAggregate
            .connect(broadcastStream)
            .process(new JoinFunction())
            .flatMap(new AddAggregationKeyAndDescriptor)
            .keyBy('aggregationKey')

Where

  *   aggregationInstructions is a stream describing which fields to aggregate by.  It might contain a List<String> of the field names and another field which can be used to describe what the aggregation is doing.   Example  groupbyFields=[‘sourceIp’,’sourcePort’,’destIp’,’destPort’], groupingType = ‘bySession’, action = ‘Add’ or  ‘Delete’
  *   JoinFunction is a KeyedBroadcastProcessFunction which adds the groupByFields and groupingType to each message in the dataToAggregate stream and possibly deletes groupings from state.
  *   AddAggregationKeyAndDescriptor is a FlatMapFunction which adds aggregationKey to the stream based on the value of groupByFields

The flatMap means one message may be emitted several times with different values of aggregationKey so it may belong to multiple aggregations.



From: M Singh <ma...@yahoo.com>
Sent: Monday, January 24, 2022 9:52 PM
To: Caizhi Weng <ts...@gmail.com>; User-Flink <us...@flink.apache.org>
Subject: Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

NOTICE: This email is from an external sender - do not click on links or attachments unless you recognize the sender and know the content is safe.

Hi Caizhi:

Thanks for your reply.

I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.

Thanks

On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com>> wrote:


Hi!

Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.

By the way, why do you need this functionality? Could you elaborate more on your use case?

M Singh <ma...@yahoo.com>> 于2022年1月22日周六 21:32写道:
Hi Folks:

I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.

Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?

Thanks









Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by M Singh <ma...@yahoo.com>.
 Hi Caizhi:
Thanks for your reply.
I need to aggregate streams based on dynamic groupings.  All the groupings (keyBy) are not known upfront and can be added or removed after the streaming application is started and I don't want to restart the application/change the code.  So, I wanted to find out, what are the options to achieve this functionality.  Please let me know if you have any advice or recommendations.
Thanks
    On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng <ts...@gmail.com> wrote:  
 
 Hi!
Adding/removing keyed streams will change the topology graph of the job. Currently it is not possible to do so without restarting the job and as far as I know there is no existing framework/pattern to achieve this.
By the way, why do you need this functionality? Could you elaborate more on your use case?
M Singh <ma...@yahoo.com> 于2022年1月22日周六 21:32写道:

Hi Folks:
I am working on an exploratory project in which I would like to add/remove KeyedStreams (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby) without restarting the Flink streaming application.
Is it possible natively in Apache Flink ?  If not, is there any framework/pattern which can be used to implement this without restarting the application/changing the code ?
Thanks








  

Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Adding/removing keyed streams will change the topology graph of the job.
Currently it is not possible to do so without restarting the job and as far
as I know there is no existing framework/pattern to achieve this.

By the way, why do you need this functionality? Could you elaborate more on
your use case?

M Singh <ma...@yahoo.com> 于2022年1月22日周六 21:32写道:

> Hi Folks:
>
> I am working on an exploratory project in which I would like to add/remove
> KeyedStreams (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
> without restarting the Flink streaming application.
>
> Is it possible natively in Apache Flink ?  If not, is there any
> framework/pattern which can be used to implement this without restarting
> the application/changing the code ?
>
> Thanks
>
>
>
>
>
>
>
>
>