You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ZalaCheung <gz...@corp.netease.com> on 2017/07/24 11:16:53 UTC

Is that possible for flink to dynamically read and change configuration?

Hi all, 

I am  now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries.
At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric  I want to monitor.

What I did was read configuration from mongo DB and set as configuration of flink.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
JSONObject jsonConfiguration = readConfiguration();
conf.setInteger("period",jsonConfiguration.getInt("period"));
conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
see.getConfig().setGlobalJobParameters(conf);
The “readConfiguration()” method read the configuration from mongoDB.

Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read  configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is  that possible?

Thanks
Desheng Zhang



Re: Is that possible for flink to dynamically read and change configuration?

Posted by Aljoscha Krettek <al...@apache.org>.
Just some clarification: Flink state is never shared between different parallel operator instances. If you want to make those changes available to all parallel instances of the operation you have to broadcast the control stream, i.e. control.broadcast().

Best,
Aljoscha

> On 24. Jul 2017, at 17:20, Chesnay Schepler <ch...@apache.org> wrote:
> 
> So I don't know why it doesn't work (it should, afaik), but as a workaround you could maintain
> an ArrayList or similar in your function, and only add/read elements from the ListState in snapshot/initialize state.
> 
> On 24.07.2017 17:10, ZalaCheung wrote:
>> Hi all,
>> 
>> Does anyone have idea about the non-keyed managed state problem below?
>> I think all the function in the testFunc class should share the ListState “metrics”. But after I add element to ListState at flatMap2 function, I cannot retrieve the element added to ListState.
>> 
>> 
>> Desheng Zhang
>> 
>> 
>>> On Jul 24, 2017, at 22:06, ZalaCheung <gzzhangdesheng@corp.netease.com <ma...@corp.netease.com>> wrote:
>>> 
>>> Hi Chesnay,
>>> 
>>> Thank you very much. Now I tried to ignore the default value of ListState and Try to use the CoFlatmap function with managed state. But what surprised me is that it seems the state was not shared by two streams.
>>> 
>>> My test code is shown below.
>>> 
>>> DataStream<String> result = stream
>>>         .connect(control)
>>>         .flatMap(new testFunc());
>>> 
>>> public static class testFunc implements CoFlatMapFunction<String,String,String>,CheckpointedFunction{
>>> 
>>>     private ListState<String> metrics;
>>> 
>>>     @Override
>>>     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
>>> 
>>>     }
>>> 
>>>     @Override
>>>     public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
>>>         ListStateDescriptor<String> metricsStateDescriptor =
>>>                 new ListStateDescriptor<>(
>>>                         "metrics",
>>>                         TypeInformation.of(new TypeHint<String>() {}));
>>>         metrics = functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);
>>> 
>>>     }
>>> 
>>>     @Override
>>>     public void flatMap1(String s, Collector<String> collector) throws Exception {
>>>         String myMetrics = null;
>>>         for(String element:metrics.get()){
>>>             logger.info("element in metric: " + s);
>>>             myMetrics = element;
>>>         }
>>>         if(myMetrics == null){
>>>             logger.info("Not initialized");
>>>         }else {
>>>             logger.info("initialized: " + myMetrics);
>>>         }
>>> 
>>>     }
>>> 
>>>     @Override
>>>     public void flatMap2(String s, Collector<String> collector) throws Exception {
>>>         metrics.clear();
>>>         metrics.add(s);
>>>         
>>>         for(String element:metrics.get()){
>>>             logger.info("element in metric: " + element);
>>> 
>>>         }
>>> 
>>>     }
>>> }
>>> 
>>> I connected two streams(stream and control) and use CoflatmapFunction on them. For control stream, I send a string and print the right log:
>>> - element in metric: heyjude
>>> Then I send another string to the first stream. 
>>> But the log prints:
>>> - Not initialized
>>> 
>>> I am confused. I successfully receive msg for stream control and add the string to ListState. But when I tried to retrieve ListState and flatMap1, I got nothing.
>>> 
>>> Thanks.
>>> Desheng Zhang
>>> 
>>> 
>>> 
>>>> On Jul 24, 2017, at 21:01, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> That's an error in the documentation, only the ValueStateDescriptor has a defaultValue constructor argument.
>>>> 
>>>> Regards,
>>>> Chesnay
>>>> 
>>>> On 24.07.2017 14:56, ZalaCheung wrote:
>>>>> Hi Martin,
>>>>> 
>>>>> Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having some trouble because of the state I want to maintain. For me, the simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected stream ).
>>>>> 
>>>>> Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which only has add() and Iterator method. I am not sure whether I can just replace the element in the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor. It says there is no constructor for initialization value. I                           actually saw that on official document.
>>>>> 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>>>>> 
>>>>> @Override
>>>>>     public void initializeState(FunctionInitializationContext context) throws Exception {
>>>>>         ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>>>>>             new ListStateDescriptor<>(
>>>>>                 "buffered-elements",
>>>>>                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
>>>>>                 Tuple2.of(0L, 0L));
>>>>> 
>>>>>         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
>>>>> 
>>>>>         if (context.isRestored()) {
>>>>>             for (Tuple2<String, Integer> element : checkpointedState.get()) {
>>>>>                 bufferedElements.add(element);
>>>>>             }
>>>>>         }
>>>>>     }
>>>>> 
>>>>> 
>>>>> But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third argument in the example above is the default value). I am really confused.
>>>>> 
>>>>> How can I maintain my state for the CoFlatMap function?
>>>>> 
>>>>> 
>>>>> Thanks
>>>>>  Desheng Zhang
>>>>> 
>>>>> 
>>>>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com <ma...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hey Desheng,
>>>>>> 
>>>>>> Some options that come to mind:
>>>>>> - Cave man style: Stop and restart job with new config.
>>>>>> - Poll scenario: You could build your own thread that periodically loads from the db into a per worker accessible cache.
>>>>>> - Push scenario: have a config stream (based off of some queue) which you connect to your data stream via the connect operator. In the CoFlatMapFunction that you have to provide you basically update Flink state from the config flatMap and read the flink state from the data flatMap and pass it along with the data. Then in the specific operator that uses the config it can always get it from the data tuple that comes alongside the data, say in an invoke method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>>>>> 
>>>>>> Hope that gives u some ideas,
>>>>>> M
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesheng@corp.netease.com <ma...@corp.netease.com>> wrote:
>>>>>> Hi all, 
>>>>>> 
>>>>>> I am  now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries.
>>>>>> At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric  I want to monitor.
>>>>>> 
>>>>>> What I did was read configuration from mongo DB and set as configuration of flink.
>>>>>> 
>>>>>> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> Configuration conf = new Configuration();
>>>>>> JSONObject jsonConfiguration = readConfiguration();
>>>>>> conf.setInteger("period",jsonConfiguration.getInt("period"));
>>>>>> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>>>>> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>>>>> see.getConfig().setGlobalJobParameters(conf);
>>>>>> The “readConfiguration()” method read the configuration from mongoDB.
>>>>>> 
>>>>>> Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read  configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is  that possible?
>>>>>> 
>>>>>> Thanks
>>>>>> Desheng Zhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Is that possible for flink to dynamically read and change configuration?

Posted by Chesnay Schepler <ch...@apache.org>.
So I don't know why it doesn't work (it should, afaik), but as a 
workaround you could maintain
an ArrayList or similar in your function, and only add/read elements 
from the ListState in snapshot/initialize state.

On 24.07.2017 17:10, ZalaCheung wrote:
> Hi all,
>
> Does anyone have idea about the non-keyed managed state problem below?
> I think all the function in the testFunc class should share the 
> ListState “metrics”. But after I add element to ListState at flatMap2 
> function, I cannot retrieve the element added to ListState.
>
>
> Desheng Zhang
>
>
>> On Jul 24, 2017, at 22:06, ZalaCheung 
>> <gzzhangdesheng@corp.netease.com 
>> <ma...@corp.netease.com>> wrote:
>>
>> Hi Chesnay,
>>
>> Thank you very much. Now I tried to ignore the default value of 
>> ListState and Try to use the CoFlatmap function with managed state. 
>> But what surprised me is that it seems the state was not shared by 
>> two streams.
>>
>> My test code is shown below.
>>
>> DataStream<String> result = stream
>>          .connect(control)
>>          .flatMap(new testFunc());
>>
>> public static class testFuncimplements CoFlatMapFunction<String,String,String>,CheckpointedFunction{
>>
>>      private ListState<String>metrics; @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext)throws Exception {
>>
>>      }
>>
>>      @Override public void initializeState(FunctionInitializationContext functionInitializationContext)throws Exception {
>>          ListStateDescriptor<String> metricsStateDescriptor =
>>                  new ListStateDescriptor<>(
>>                          "metrics", TypeInformation.of(new TypeHint<String>() {})); metrics = functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor); }
>>
>>      @Override public void flatMap1(String s, Collector<String> collector)throws Exception {
>>          String myMetrics =null; for(String element:metrics.get()){
>>              logger.info("element in metric: " + s); myMetrics = element; }
>>          if(myMetrics ==null){
>>              logger.info("Not initialized"); }else {
>>              logger.info("initialized: " + myMetrics); }
>>
>>      }
>>
>>      @Override public void flatMap2(String s, Collector<String> collector)throws Exception {
>>          metrics.clear(); metrics.add(s); for(String element:metrics.get()){
>>              logger.info("element in metric: " + element); }
>>
>>      }
>> }
>>
>> I connected two streams(stream and control) and use CoflatmapFunction 
>> on them. For control stream, I send a string and print the right log:
>> *- element in metric: heyjude*
>> Then I send another string to the first stream.
>> But the log prints:
>> *- Not initialized*
>>
>> I am confused. I successfully receive msg for stream control and add 
>> the string to ListState. But when I tried to retrieve ListState and 
>> flatMap1, I got nothing.
>>
>> Thanks.
>> Desheng Zhang
>>
>>
>>
>>> On Jul 24, 2017, at 21:01, Chesnay Schepler <chesnay@apache.org 
>>> <ma...@apache.org>> wrote:
>>>
>>> Hello,
>>>
>>> That's an error in the documentation, only the ValueStateDescriptor 
>>> has a defaultValue constructor argument.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 24.07.2017 14:56, ZalaCheung wrote:
>>>> Hi Martin,
>>>>
>>>> Thanks for your advice. That’s really helpful. I am using the push 
>>>> scenario. I am now having some trouble because of the state I want 
>>>> to maintain. For me, the simplest way is to maintain to ValueState 
>>>> in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the 
>>>> rich function can only be used on Keyed Stream. And for a connected 
>>>> stream, at least for my scenario, I should not use KeyBy() 
>>>> method(Actually it seems not allowed to use KeyBy() function on 
>>>> connected stream ).
>>>>
>>>> Thus instead of using Rich function for Keyed Managed State, I 
>>>> tried to use CheckpointedFunction for my non-keyed state. However, 
>>>> in CheckpointedFunction, I can only use ListState, which only has 
>>>> add() and Iterator method. I am not sure whether I can just replace 
>>>> the element in the ListState. What exactly make me stuck is that I 
>>>> cannot initialize my ListState with ListStateDescriptor. It says 
>>>> there is no constructor for initialization value. I actually saw 
>>>> that on official document.
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>>>>
>>>> |@Override public void initializeState(FunctionInitializationContext 
>>>> context) throws Exception { ListStateDescriptor<Tuple2<String, 
>>>> Integer>> descriptor = new ListStateDescriptor<>( 
>>>> "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<Long, 
>>>> Long>>() {}), Tuple2.of(0L, 0L)); checkpointedState = 
>>>> context.getOperatorStateStore().getListState(descriptor); if 
>>>> (context.isRestored()) { for (Tuple2<String, Integer> element : 
>>>> checkpointedState.get()) { bufferedElements.add(element); } } }|
>>>>
>>>>
>>>> But in my code(Flink 1.3.1), it says there’s no constructor for 
>>>> three arguments(the third argument in the example above is the 
>>>> default value). I am really confused.
>>>>
>>>> How can I maintain my state for the CoFlatMap function?
>>>>
>>>>
>>>> Thanks
>>>>  Desheng Zhang
>>>>
>>>>
>>>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com 
>>>>> <ma...@gmail.com>> wrote:
>>>>>
>>>>> Hey Desheng,
>>>>>
>>>>> Some options that come to mind:
>>>>> - Cave man style: Stop and restart job with new config.
>>>>> - Poll scenario: You could build your own thread that periodically 
>>>>> loads from the db into a per worker accessible cache.
>>>>> - Push scenario: have a config stream (based off of some queue) 
>>>>> which you connect to your data stream via the connect operator. In 
>>>>> the CoFlatMapFunction that you have to provide you basically 
>>>>> update Flink state from the config flatMap and read the flink 
>>>>> state from the data flatMap and pass it along with the data. Then 
>>>>> in the specific operator that uses the config it can always get it 
>>>>> from the data tuple that comes alongside the data, say in an 
>>>>> invoke method call of a sink. Example here 
>>>>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>>>>
>>>>> Hope that gives u some ideas,
>>>>> M
>>>>>
>>>>>
>>>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
>>>>> <gzzhangdesheng@corp.netease.com 
>>>>> <ma...@corp.netease.com>> wrote:
>>>>>
>>>>>     Hi all,
>>>>>
>>>>>     I am  now trying to implement a anomaly detection algorithm on
>>>>>     Flink, which is actually implement a Map operator to do
>>>>>     anomaly detection based on timeseries.
>>>>>     At first I want to read configuration(like which kafka source
>>>>>     host to read datastream from and which sink address to write
>>>>>     data to ) from mongo db. It contains some system metric  I
>>>>>     want to monitor.
>>>>>
>>>>>     What I did was read configuration from mongo DB and set as
>>>>>     configuration of flink.
>>>>>
>>>>>     StreamExecutionEnvironment  see =StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf =new Configuration();
>>>>>
>>>>>     JSONObject jsonConfiguration =readConfiguration();
>>>>>
>>>>>     conf.setInteger("period",jsonConfiguration.getInt("period"));
>>>>>     conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>>>>     conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>>>>
>>>>>     see.getConfig().setGlobalJobParameters(conf);
>>>>>
>>>>>     The “readConfiguration()” method read the configuration from
>>>>>     mongoDB.
>>>>>
>>>>>     Just like the code I showed above. I set globalJobParameters
>>>>>     to let all my workers share these parameters including the
>>>>>     metric I want to monitor.But maybe at some point I want to
>>>>>     change the metric I want to monitor. I think one possible way
>>>>>     is to dynamically(or periodically) read  configuration and
>>>>>     reset the globalJobParameters to make the Flink program to
>>>>>     change the metric to monitor. Is  that possible?
>>>>>
>>>>>     Thanks
>>>>>     Desheng Zhang
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Is that possible for flink to dynamically read and change configuration?

Posted by ZalaCheung <gz...@corp.netease.com>.
Hi all,

Does anyone have idea about the non-keyed managed state problem below?
I think all the function in the testFunc class should share the ListState “metrics”. But after I add element to ListState at flatMap2 function, I cannot retrieve the element added to ListState.


Desheng Zhang


> On Jul 24, 2017, at 22:06, ZalaCheung <gz...@corp.netease.com> wrote:
> 
> Hi Chesnay,
> 
> Thank you very much. Now I tried to ignore the default value of ListState and Try to use the CoFlatmap function with managed state. But what surprised me is that it seems the state was not shared by two streams.
> 
> My test code is shown below.
> 
> DataStream<String> result = stream
>         .connect(control)
>         .flatMap(new testFunc());
> 
> public static class testFunc implements CoFlatMapFunction<String,String,String>,CheckpointedFunction{
> 
>     private ListState<String> metrics;
> 
>     @Override
>     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
> 
>     }
> 
>     @Override
>     public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
>         ListStateDescriptor<String> metricsStateDescriptor =
>                 new ListStateDescriptor<>(
>                         "metrics",
>                         TypeInformation.of(new TypeHint<String>() {}));
>         metrics = functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);
> 
>     }
> 
>     @Override
>     public void flatMap1(String s, Collector<String> collector) throws Exception {
>         String myMetrics = null;
>         for(String element:metrics.get()){
>             logger.info("element in metric: " + s);
>             myMetrics = element;
>         }
>         if(myMetrics == null){
>             logger.info("Not initialized");
>         }else {
>             logger.info("initialized: " + myMetrics);
>         }
> 
>     }
> 
>     @Override
>     public void flatMap2(String s, Collector<String> collector) throws Exception {
>         metrics.clear();
>         metrics.add(s);
>         
>         for(String element:metrics.get()){
>             logger.info("element in metric: " + element);
> 
>         }
> 
>     }
> }
> 
> I connected two streams(stream and control) and use CoflatmapFunction on them. For control stream, I send a string and print the right log:
> - element in metric: heyjude
> Then I send another string to the first stream. 
> But the log prints:
> - Not initialized
> 
> I am confused. I successfully receive msg for stream control and add the string to ListState. But when I tried to retrieve ListState and flatMap1, I got nothing.
> 
> Thanks.
> Desheng Zhang
> 
> 
> 
>> On Jul 24, 2017, at 21:01, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>> 
>> Hello,
>> 
>> That's an error in the documentation, only the ValueStateDescriptor has a defaultValue constructor argument.
>> 
>> Regards,
>> Chesnay
>> 
>> On 24.07.2017 14:56, ZalaCheung wrote:
>>> Hi Martin,
>>> 
>>> Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having some trouble because of the state I want to maintain. For me, the simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected stream ).
>>> 
>>> Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which only has add() and Iterator method. I am not sure whether I can just replace the element in the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor. It says there is no constructor for initialization value. I actually saw that on official document.
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>>> 
>>> @Override
>>>     public void initializeState(FunctionInitializationContext context) throws Exception {
>>>         ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>>>             new ListStateDescriptor<>(
>>>                 "buffered-elements",
>>>                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
>>>                 Tuple2.of(0L, 0L));
>>> 
>>>         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
>>> 
>>>         if (context.isRestored()) {
>>>             for (Tuple2<String, Integer> element : checkpointedState.get()) {
>>>                 bufferedElements.add(element);
>>>             }
>>>         }
>>>     }
>>> 
>>> 
>>> But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third argument in the example above is the default value). I am really confused.
>>> 
>>> How can I maintain my state for the CoFlatMap function?
>>> 
>>> 
>>> Thanks
>>>  Desheng Zhang
>>> 
>>> 
>>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hey Desheng,
>>>> 
>>>> Some options that come to mind:
>>>> - Cave man style: Stop and restart job with new config.
>>>> - Poll scenario: You could build your own thread that periodically loads from the db into a per worker accessible cache.
>>>> - Push scenario: have a config stream (based off of some queue) which you connect to your data stream via the connect operator. In the CoFlatMapFunction that you have to provide you basically update Flink state from the config flatMap and read the flink state from the data flatMap and pass it along with the data. Then in the specific operator that uses the config it can always get it from the data tuple that comes alongside the data, say in an invoke method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>>> 
>>>> Hope that gives u some ideas,
>>>> M
>>>> 
>>>> 
>>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesheng@corp.netease.com <ma...@corp.netease.com>> wrote:
>>>> Hi all, 
>>>> 
>>>> I am  now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries.
>>>> At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric  I want to monitor.
>>>> 
>>>> What I did was read configuration from mongo DB and set as configuration of flink.
>>>> 
>>>> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> Configuration conf = new Configuration();
>>>> JSONObject jsonConfiguration = readConfiguration();
>>>> conf.setInteger("period",jsonConfiguration.getInt("period"));
>>>> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>>> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>>> see.getConfig().setGlobalJobParameters(conf);
>>>> The “readConfiguration()” method read the configuration from mongoDB.
>>>> 
>>>> Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read  configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is  that possible?
>>>> 
>>>> Thanks
>>>> Desheng Zhang
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 


Re: Is that possible for flink to dynamically read and change configuration?

Posted by ZalaCheung <gz...@corp.netease.com>.
Hi Chesnay,

Thank you very much. Now I tried to ignore the default value of ListState and Try to use the CoFlatmap function with managed state. But what surprised me is that it seems the state was not shared by two streams.

My test code is shown below.

DataStream<String> result = stream
        .connect(control)
        .flatMap(new testFunc());

public static class testFunc implements CoFlatMapFunction<String,String,String>,CheckpointedFunction{

    private ListState<String> metrics;

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {

    }

    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ListStateDescriptor<String> metricsStateDescriptor =
                new ListStateDescriptor<>(
                        "metrics",
                        TypeInformation.of(new TypeHint<String>() {}));
        metrics = functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor);

    }

    @Override
    public void flatMap1(String s, Collector<String> collector) throws Exception {
        String myMetrics = null;
        for(String element:metrics.get()){
            logger.info("element in metric: " + s);
            myMetrics = element;
        }
        if(myMetrics == null){
            logger.info("Not initialized");
        }else {
            logger.info("initialized: " + myMetrics);
        }

    }

    @Override
    public void flatMap2(String s, Collector<String> collector) throws Exception {
        metrics.clear();
        metrics.add(s);
        
        for(String element:metrics.get()){
            logger.info("element in metric: " + element);

        }

    }
}

I connected two streams(stream and control) and use CoflatmapFunction on them. For control stream, I send a string and print the right log:
- element in metric: heyjude
Then I send another string to the first stream. 
But the log prints:
- Not initialized

I am confused. I successfully receive msg for stream control and add the string to ListState. But when I tried to retrieve ListState and flatMap1, I got nothing.

Thanks.
Desheng Zhang



> On Jul 24, 2017, at 21:01, Chesnay Schepler <ch...@apache.org> wrote:
> 
> Hello,
> 
> That's an error in the documentation, only the ValueStateDescriptor has a defaultValue constructor argument.
> 
> Regards,
> Chesnay
> 
> On 24.07.2017 14:56, ZalaCheung wrote:
>> Hi Martin,
>> 
>> Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having some trouble because of the state I want to maintain. For me, the simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected stream ).
>> 
>> Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which only has add() and Iterator method. I am not sure whether I can just replace the element in the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor. It says there is no constructor for initialization value. I actually saw that on official document.
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>> 
>> @Override
>>     public void initializeState(FunctionInitializationContext context) throws Exception {
>>         ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>>             new ListStateDescriptor<>(
>>                 "buffered-elements",
>>                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
>>                 Tuple2.of(0L, 0L));
>> 
>>         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
>> 
>>         if (context.isRestored()) {
>>             for (Tuple2<String, Integer> element : checkpointedState.get()) {
>>                 bufferedElements.add(element);
>>             }
>>         }
>>     }
>> 
>> 
>> But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third argument in the example above is the default value). I am really confused.
>> 
>> How can I maintain my state for the CoFlatMap function?
>> 
>> 
>> Thanks
>>  Desheng Zhang
>> 
>> 
>>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hey Desheng,
>>> 
>>> Some options that come to mind:
>>> - Cave man style: Stop and restart job with new config.
>>> - Poll scenario: You could build your own thread that periodically loads from the db into a per worker accessible cache.
>>> - Push scenario: have a config stream (based off of some queue) which you connect to your data stream via the connect operator. In the CoFlatMapFunction that you have to provide you basically update Flink state from the config flatMap and read the flink state from the data flatMap and pass it along with the data. Then in the specific operator that uses the config it can always get it from the data tuple that comes alongside the data, say in an invoke method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>> 
>>> Hope that gives u some ideas,
>>> M
>>> 
>>> 
>>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesheng@corp.netease.com <ma...@corp.netease.com>> wrote:
>>> Hi all, 
>>> 
>>> I am  now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries.
>>> At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric  I want to monitor.
>>> 
>>> What I did was read configuration from mongo DB and set as configuration of flink.
>>> 
>>> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
>>> Configuration conf = new Configuration();
>>> JSONObject jsonConfiguration = readConfiguration();
>>> conf.setInteger("period",jsonConfiguration.getInt("period"));
>>> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>> see.getConfig().setGlobalJobParameters(conf);
>>> The “readConfiguration()” method read the configuration from mongoDB.
>>> 
>>> Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read  configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is  that possible?
>>> 
>>> Thanks
>>> Desheng Zhang
>>> 
>>> 
>>> 
>> 
> 


Re: Is that possible for flink to dynamically read and change configuration?

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

That's an error in the documentation, only the ValueStateDescriptor has 
a defaultValue constructor argument.

Regards,
Chesnay

On 24.07.2017 14:56, ZalaCheung wrote:
> Hi Martin,
>
> Thanks for your advice. That’s really helpful. I am using the push 
> scenario. I am now having some trouble because of the state I want to 
> maintain. For me, the simplest way is to maintain to ValueState in a 
> CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich 
> function can only be used on Keyed Stream. And for a connected stream, 
> at least for my scenario, I should not use KeyBy() method(Actually it 
> seems not allowed to use KeyBy() function on connected stream ).
>
> Thus instead of using Rich function for Keyed Managed State, I tried 
> to use CheckpointedFunction for my non-keyed state. However, in 
> CheckpointedFunction, I can only use ListState, which only has add() 
> and Iterator method. I am not sure whether I can just replace the 
> element in the ListState. What exactly make me stuck is that I cannot 
> initialize my ListState with ListStateDescriptor. It says there is no 
> constructor for initialization value. I actually saw that on official 
> document.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>
> |@Override public void initializeState(FunctionInitializationContext 
> context) throws Exception { ListStateDescriptor<Tuple2<String, 
> Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", 
> TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), 
> Tuple2.of(0L, 0L)); checkpointedState = 
> context.getOperatorStateStore().getListState(descriptor); if 
> (context.isRestored()) { for (Tuple2<String, Integer> element : 
> checkpointedState.get()) { bufferedElements.add(element); } } }|
>
>
> But in my code(Flink 1.3.1), it says there’s no constructor for three 
> arguments(the third argument in the example above is the default 
> value). I am really confused.
>
> How can I maintain my state for the CoFlatMap function?
>
>
> Thanks
>  Desheng Zhang
>
>
>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hey Desheng,
>>
>> Some options that come to mind:
>> - Cave man style: Stop and restart job with new config.
>> - Poll scenario: You could build your own thread that periodically 
>> loads from the db into a per worker accessible cache.
>> - Push scenario: have a config stream (based off of some queue) which 
>> you connect to your data stream via the connect operator. In the 
>> CoFlatMapFunction that you have to provide you basically update Flink 
>> state from the config flatMap and read the flink state from the data 
>> flatMap and pass it along with the data. Then in the specific 
>> operator that uses the config it can always get it from the data 
>> tuple that comes alongside the data, say in an invoke method call of 
>> a sink. Example here 
>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>>
>> Hope that gives u some ideas,
>> M
>>
>>
>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
>> <gzzhangdesheng@corp.netease.com 
>> <ma...@corp.netease.com>> wrote:
>>
>>     Hi all,
>>
>>     I am  now trying to implement a anomaly detection algorithm on
>>     Flink, which is actually implement a Map operator to do anomaly
>>     detection based on timeseries.
>>     At first I want to read configuration(like which kafka source
>>     host to read datastream from and which sink address to write data
>>     to ) from mongo db. It contains some system metric  I want to
>>     monitor.
>>
>>     What I did was read configuration from mongo DB and set as
>>     configuration of flink.
>>
>>     StreamExecutionEnvironment  see =StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf =new Configuration();
>>
>>     JSONObject jsonConfiguration =readConfiguration();
>>
>>     conf.setInteger("period",jsonConfiguration.getInt("period"));
>>     conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>     conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>
>>     see.getConfig().setGlobalJobParameters(conf);
>>
>>     The “readConfiguration()” method read the configuration from mongoDB.
>>
>>     Just like the code I showed above. I set globalJobParameters to
>>     let all my workers share these parameters including the metric I
>>     want to monitor.But maybe at some point I want to change the
>>     metric I want to monitor. I think one possible way is to
>>     dynamically(or periodically) read  configuration and reset
>>     the globalJobParameters to make the Flink program to change the
>>     metric to monitor. Is  that possible?
>>
>>     Thanks
>>     Desheng Zhang
>>
>>
>>
>


Re: Is that possible for flink to dynamically read and change configuration?

Posted by ZalaCheung <gz...@corp.netease.com>.
Hi Martin,

Thanks for your advice. That’s really helpful. I am using the push scenario. I am now having some trouble because of the state I want to maintain. For me, the simplest way is to maintain to ValueState in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich function can only be used on Keyed Stream. And for a connected stream, at least for my scenario, I should not use KeyBy() method(Actually it seems not allowed to use KeyBy() function on connected stream ).

Thus instead of using Rich function for Keyed Managed State, I tried to use CheckpointedFunction for my non-keyed state. However, in CheckpointedFunction, I can only use ListState, which only has add() and Iterator method. I am not sure whether I can just replace the element in the ListState. What exactly make me stuck is that I cannot initialize my ListState with ListStateDescriptor. It says there is no constructor for initialization value. I actually saw that on official document.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>

@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                Tuple2.of(0L, 0L));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }


But in my code(Flink 1.3.1), it says there’s no constructor for three arguments(the third argument in the example above is the default value). I am really confused.

How can I maintain my state for the CoFlatMap function?


Thanks
 Desheng Zhang


> On Jul 24, 2017, at 19:44, Martin Eden <ma...@gmail.com> wrote:
> 
> Hey Desheng,
> 
> Some options that come to mind:
> - Cave man style: Stop and restart job with new config.
> - Poll scenario: You could build your own thread that periodically loads from the db into a per worker accessible cache.
> - Push scenario: have a config stream (based off of some queue) which you connect to your data stream via the connect operator. In the CoFlatMapFunction that you have to provide you basically update Flink state from the config flatMap and read the flink state from the data flatMap and pass it along with the data. Then in the specific operator that uses the config it can always get it from the data tuple that comes alongside the data, say in an invoke method call of a sink. Example here <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
> 
> Hope that gives u some ideas,
> M
> 
> 
> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <gzzhangdesheng@corp.netease.com <ma...@corp.netease.com>> wrote:
> Hi all, 
> 
> I am  now trying to implement a anomaly detection algorithm on Flink, which is actually implement a Map operator to do anomaly detection based on timeseries.
> At first I want to read configuration(like which kafka source host to read datastream from and which sink address to write data to ) from mongo db. It contains some system metric  I want to monitor.
> 
> What I did was read configuration from mongo DB and set as configuration of flink.
> 
> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
> JSONObject jsonConfiguration = readConfiguration();
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
> see.getConfig().setGlobalJobParameters(conf);
> The “readConfiguration()” method read the configuration from mongoDB.
> 
> Just like the code I showed above. I set globalJobParameters to let all my workers share these parameters including the metric I want to monitor.But maybe at some point I want to change the metric I want to monitor. I think one possible way is to dynamically(or periodically) read  configuration and reset the globalJobParameters to make the Flink program to change the metric to monitor. Is  that possible?
> 
> Thanks
> Desheng Zhang
> 
> 
> 


Re: Is that possible for flink to dynamically read and change configuration?

Posted by Martin Eden <ma...@gmail.com>.
Hey Desheng,

Some options that come to mind:
- Cave man style: Stop and restart job with new config.
- Poll scenario: You could build your own thread that periodically loads
from the db into a per worker accessible cache.
- Push scenario: have a config stream (based off of some queue) which you
connect to your data stream via the connect operator. In the
CoFlatMapFunction that you have to provide you basically update Flink state
from the config flatMap and read the flink state from the data flatMap and
pass it along with the data. Then in the specific operator that uses the
config it can always get it from the data tuple that comes alongside the
data, say in an invoke method call of a sink. Example here
<https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>
.

Hope that gives u some ideas,
M


On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung <
gzzhangdesheng@corp.netease.com> wrote:

> Hi all,
>
> I am  now trying to implement a anomaly detection algorithm on Flink,
> which is actually implement a Map operator to do anomaly detection based on
> timeseries.
> At first I want to read configuration(like which kafka source host to read
> datastream from and which sink address to write data to ) from mongo db. It
> contains some system metric  I want to monitor.
>
> What I did was read configuration from mongo DB and set as configuration
> of flink.
>
> StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
> Configuration conf = new Configuration();
>
> JSONObject jsonConfiguration = readConfiguration();
>
> conf.setInteger("period",jsonConfiguration.getInt("period"));
> conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
> conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>
> see.getConfig().setGlobalJobParameters(conf);
>
> The “readConfiguration()” method read the configuration from mongoDB.
>
> Just like the code I showed above. I set globalJobParameters to let all my
> workers share these parameters including the metric I want to monitor.But
> maybe at some point I want to change the metric I want to monitor. I think
> one possible way is to dynamically(or periodically) read  configuration and
> reset the globalJobParameters to make the Flink program to change the
> metric to monitor. Is  that possible?
>
> Thanks
> Desheng Zhang
>
>
>