You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by xu <xu...@163.com> on 2017/07/04 13:56:23 UTC

refactor StreamConfig

HI All:
     I am sorry about working with StreamConfig(https://github.com/apache/flink/pull/4241) which may conflicts with others' work before discussing.


     Motivation:
         A Task contains one or more operators with chainning, however configs of operator and task are all put in StreamConfig. For example, when an opeator is setup with the StreamConfig, it can see the interface about physicalEdges or chained.task.configs, which are confused.  Similarly a streamTask should not see the interface about chain.index.
         So we need to separate OperatorConfig from StreamConfig. A streamTask inits with the streamConfig, and then extracts operatorConfigs from it, build streamOperators with every operatorConfig. 
    OperatorConfig:  for the streamOperator to setup with, it constains informations that only belong to the streamOperator. It contains:
       1)  operator information: name, id
       2)  streamOperator
       3)  input serializer.
       4)  output edges and serializers.
       5)  chain.index
       6)  state.key.serializer


     StreamConfig: for the streamTask to use:
       1) in.physical.edges
       2) out.physical.edges
       3) chained OperatorConfigs
       4) execution environment: checkpoint, state.backend and so on... 
 
    Proposed Change
      I propose overall changes:
       1) Builde jobGraph from streamGraph
       2) StreamOperator is setup with a operatorConfig, so the setup interface need to change


    (1) Build jobGraph from streamGraph
       When building, first we get every operatorConfig from the streamNode. And then put operatorConfigs of streamNodes to a streamConfig when we chain them to a jobVertex.


    (2) StreamOperator setup with OperatorProperties
       An OperatorConfig is provided instead of streamConfig when the streamOperator sets up. Thanks to the advice of StephanEwan, OperatorConfig is no need to have a Map of "configKey" to values, just is a  serializable class with the respective fields, And StreamConfig still relys on an underlying Configuration, because the streamConfig flows by its underlying configuration.
          
      There are people who have already thought about this, maybe someone has been working on it. I need your advice.


      Thanks a lot for replying and Best Regards.


      JiPing

Re: [DISCUSS]refactor StreamConfig

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Yes, that sounds very good! I like the idea of having an interface that is a view on some of the settings. We could maybe improve the naming, because to me it is not completely clear what the difference between OperatorSettings and OperatorProperties is (I mean just the name, the functionality you defined very well!). This is easy to change though.

I think we arrived at a good solution now, thanks!

Best,
Aljoscha

> On 6. Jul 2017, at 16:55, Xu Pingyong <xu...@163.com> wrote:
> 
> Hi Aljoscha:
> 
>   Great. I cannot agree with you more. So I introduce OperatorSettings and OperatorProperties.
> 
> 
>   StreamTaskConfig relys on the underlying configuration and is provided for the streamTask to use. It contains:
>     1) in.physical.edges
>     2) out.physical.edges
>     3) chained OperatorSettings
>     4) execution environment: checkpoint, state.backend and so on... 
> 
> 
>   OperatorSettings is serialisable and stores things that are tied to one operator within the chain. It is provided for the streamTask to build an operator. It contains:
>       1)  operator information: name, id
>       2)  streamOperator
>       3)  input serializer.
>       4)  output edges and serializers.
>       5)  s.chain.start, is.chain.end
>       6)  state.key.serializer
> 
> 
>   OperatorProperties can be an interface to view few things of OperatorSettings, it is provided for an operator to setup, it contains:
>       1)  operator information: name, id
>       2)  input serializer.
>       3)  is.chain.start, is.chain.end  (existed now, maybe moved later)
>       4)  state.key.serializer
> 
> 
>    What do you think?
> 
> 
> Best Regards!
> Xu Pingyong


Re:[DISCUSS]refactor StreamConfig

Posted by Xu Pingyong <xu...@163.com>.
Hi Aljoscha:
  
   Great. I cannot agree with you more. So I introduce OperatorSettings and OperatorProperties.


   StreamTaskConfig relys on the underlying configuration and is provided for the streamTask to use. It contains:
     1) in.physical.edges
     2) out.physical.edges
     3) chained OperatorSettings
     4) execution environment: checkpoint, state.backend and so on... 


   OperatorSettings is serialisable and stores things that are tied to one operator within the chain. It is provided for the streamTask to build an operator. It contains:
       1)  operator information: name, id
       2)  streamOperator
       3)  input serializer.
       4)  output edges and serializers.
       5)  s.chain.start, is.chain.end
       6)  state.key.serializer


   OperatorProperties can be an interface to view few things of OperatorSettings, it is provided for an operator to setup, it contains:
       1)  operator information: name, id
       2)  input serializer.
       3)  is.chain.start, is.chain.end  (existed now, maybe moved later)
       4)  state.key.serializer


    What do you think?


 Best Regards!
Xu Pingyong

Re: refactor StreamConfig

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Yes, the fact that the operator can see isChainStart() and isChainEnd() is not good, in my opinion. These seems to be an implementation detail that an operator should not be aware of. For now it’s ok but maybe we can fix that later.

Regarding output edges and serialisers, I think it might be necessary to differentiate between an operator config that the operator “can see”, this would be very minimal, and an operator config that the task uses to setup the chain and other stuff. This would store things that are tied to one operator within the chain but that the operator itself must not be concerned with. What do you think?

Best,
Aljoscha

> On 5. Jul 2017, at 07:39, Xu Pingyong <xu...@163.com> wrote:
> 
> Hi Aljoscha:
> 
> 
>    I sum up my thoughts now.
>    1. rename StreamConfig to StreamTaskConfig.
>    2. OperatorConig can be changed to be serialisable. If StreamTaskConfig is also serialisable, it cannot be deserialized when it is passed to the jobManager, which do not depend on "flink-streaming-java".
>    3. The call getChainIndex() is used only in OperatorConfig.toString(), it can be removed. However, isChainStart() and isChainEnd() is used in AbstractStreamOperator.setup(...).
> 
>    However I am not sure whether to put some properties in StreamTaskConfig or OperatorConfig, for example input serializer is used not only in Operator but also in OpeatorChain. Linkewise output edges and serialisers are only used in OpeatorChain now, but whether the operator can see and use them later?
>    2)  streamOperator
>    4)  output edges and serializers.
> 
>   What do you think?
> 
> 
>    Best Regards!
> Xu Pingyong
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2017-07-05 11:02:56, "Xu Pingyong" <xu...@163.com> wrote:
>> Hi Aljoscha:
>> 
>> 
>> Ye, I agree with you that an operator should not see output edges and serialisers. The call getChainIndex() is used only in OperatorConfig.toString(), it can be removed. However, isChainStart() and isChainEnd() is used in AbstractStreamOperator.setup(...).
>> 
>> 
>> But I think what Stephan meant is only that changing OperatorConfig to be serialisable. If StreamConfig is also serialisable, it need to be serialized into the Configuration, which is underlying before and flows across modules.
>> 
>> 
>> Do you agree what I understand?
>> 
>> 
>> Best Regards!
>> 
>> 
>> Xu Pingyong
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2017-07-05 00:01:34, "Aljoscha Krettek" <al...@apache.org> wrote:
>>> Hi,
>>> 
>>> Yes, but I think what Stephan was hinting at was to change both of them to be serialisable when already working on this.
>>> 
>>> I think input serialiser is fine to have in OperatorConfig, you’re right! I don’t see getChainIndex() used anywhere in the code, though. And the output edges and serialisers also look like they should not be visible to the operator.
>>> 
>>> What do you think?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 4. Jul 2017, at 17:52, xu <xu...@163.com> wrote:
>>>> 
>>>> Hi Aljoscha:
>>>>   Thanks a lot for your advice.
>>>> 
>>>> 
>>>>   I think I have not need to separate steps, because what I do is only that introducing OperatorConfig and moving the fields. StreamConfig  still  relys on an underlying Configuration which flows from client to the jobmanager and then to the task.
>>>> 
>>>> 
>>>>   The following configs are used in an operator now:
>>>>   2) input serializer is used in AsyncWaitOperator.class
>>>>   5) chain.index is used in AbstractStreamOperator.setup(...)
>>>> 
>>>> 
>>>>   However, What I put in the OperatorConfig is all configs belong to the operator, contains not only the operator uses now, but also the streamTask uses to build an operator. By OperatorConfig, an operator can not see configs belong to others.
>>>> 
>>>> 
>>>>  Best Regards!
>>>>  JiPing
>>> 


Re:refactor StreamConfig

Posted by Xu Pingyong <xu...@163.com>.
Hi Aljoscha:


    I sum up my thoughts now.
    1. rename StreamConfig to StreamTaskConfig.
    2. OperatorConig can be changed to be serialisable. If StreamTaskConfig is also serialisable, it cannot be deserialized when it is passed to the jobManager, which do not depend on "flink-streaming-java".
    3. The call getChainIndex() is used only in OperatorConfig.toString(), it can be removed. However, isChainStart() and isChainEnd() is used in AbstractStreamOperator.setup(...).
    
    However I am not sure whether to put some properties in StreamTaskConfig or OperatorConfig, for example input serializer is used not only in Operator but also in OpeatorChain. Linkewise output edges and serialisers are only used in OpeatorChain now, but whether the operator can see and use them later?
    2)  streamOperator
    4)  output edges and serializers.
    
   What do you think?


    Best Regards!
Xu Pingyong
   








At 2017-07-05 11:02:56, "Xu Pingyong" <xu...@163.com> wrote:
>Hi Aljoscha:
>
>
>Ye, I agree with you that an operator should not see output edges and serialisers. The call getChainIndex() is used only in OperatorConfig.toString(), it can be removed. However, isChainStart() and isChainEnd() is used in AbstractStreamOperator.setup(...).
>
>
>But I think what Stephan meant is only that changing OperatorConfig to be serialisable. If StreamConfig is also serialisable, it need to be serialized into the Configuration, which is underlying before and flows across modules.
>
>
>Do you agree what I understand?
>
>
>Best Regards!
>
>
>Xu Pingyong
>  
>
>
>
>
>
>
>At 2017-07-05 00:01:34, "Aljoscha Krettek" <al...@apache.org> wrote:
>>Hi,
>>
>>Yes, but I think what Stephan was hinting at was to change both of them to be serialisable when already working on this.
>>
>>I think input serialiser is fine to have in OperatorConfig, you’re right! I don’t see getChainIndex() used anywhere in the code, though. And the output edges and serialisers also look like they should not be visible to the operator.
>>
>>What do you think?
>>
>>Best,
>>Aljoscha
>>
>>> On 4. Jul 2017, at 17:52, xu <xu...@163.com> wrote:
>>> 
>>> Hi Aljoscha:
>>>    Thanks a lot for your advice.
>>> 
>>> 
>>>    I think I have not need to separate steps, because what I do is only that introducing OperatorConfig and moving the fields. StreamConfig  still  relys on an underlying Configuration which flows from client to the jobmanager and then to the task.
>>> 
>>> 
>>>    The following configs are used in an operator now:
>>>    2) input serializer is used in AsyncWaitOperator.class
>>>    5) chain.index is used in AbstractStreamOperator.setup(...)
>>> 
>>> 
>>>    However, What I put in the OperatorConfig is all configs belong to the operator, contains not only the operator uses now, but also the streamTask uses to build an operator. By OperatorConfig, an operator can not see configs belong to others.
>>> 
>>> 
>>>   Best Regards!
>>>   JiPing
>>

Re: refactor StreamConfig

Posted by Xu Pingyong <xu...@163.com>.
Hi Aljoscha:


Ye, I agree with you that an operator should not see output edges and serialisers. The call getChainIndex() is used only in OperatorConfig.toString(), it can be removed. However, isChainStart() and isChainEnd() is used in AbstractStreamOperator.setup(...).


But I think what Stephan meant is only that changing OperatorConfig to be serialisable. If StreamConfig is also serialisable, it need to be serialized into the Configuration, which is underlying before and flows across modules.


Do you agree what I understand?


Best Regards!


Xu Pingyong
  






At 2017-07-05 00:01:34, "Aljoscha Krettek" <al...@apache.org> wrote:
>Hi,
>
>Yes, but I think what Stephan was hinting at was to change both of them to be serialisable when already working on this.
>
>I think input serialiser is fine to have in OperatorConfig, you’re right! I don’t see getChainIndex() used anywhere in the code, though. And the output edges and serialisers also look like they should not be visible to the operator.
>
>What do you think?
>
>Best,
>Aljoscha
>
>> On 4. Jul 2017, at 17:52, xu <xu...@163.com> wrote:
>> 
>> Hi Aljoscha:
>>    Thanks a lot for your advice.
>> 
>> 
>>    I think I have not need to separate steps, because what I do is only that introducing OperatorConfig and moving the fields. StreamConfig  still  relys on an underlying Configuration which flows from client to the jobmanager and then to the task.
>> 
>> 
>>    The following configs are used in an operator now:
>>    2) input serializer is used in AsyncWaitOperator.class
>>    5) chain.index is used in AbstractStreamOperator.setup(...)
>> 
>> 
>>    However, What I put in the OperatorConfig is all configs belong to the operator, contains not only the operator uses now, but also the streamTask uses to build an operator. By OperatorConfig, an operator can not see configs belong to others.
>> 
>> 
>>   Best Regards!
>>   JiPing
>

Re: refactor StreamConfig

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Yes, but I think what Stephan was hinting at was to change both of them to be serialisable when already working on this.

I think input serialiser is fine to have in OperatorConfig, you’re right! I don’t see getChainIndex() used anywhere in the code, though. And the output edges and serialisers also look like they should not be visible to the operator.

What do you think?

Best,
Aljoscha

> On 4. Jul 2017, at 17:52, xu <xu...@163.com> wrote:
> 
> Hi Aljoscha:
>    Thanks a lot for your advice.
> 
> 
>    I think I have not need to separate steps, because what I do is only that introducing OperatorConfig and moving the fields. StreamConfig  still  relys on an underlying Configuration which flows from client to the jobmanager and then to the task.
> 
> 
>    The following configs are used in an operator now:
>    2) input serializer is used in AsyncWaitOperator.class
>    5) chain.index is used in AbstractStreamOperator.setup(...)
> 
> 
>    However, What I put in the OperatorConfig is all configs belong to the operator, contains not only the operator uses now, but also the streamTask uses to build an operator. By OperatorConfig, an operator can not see configs belong to others.
> 
> 
>   Best Regards!
>   JiPing


Re:Re: refactor StreamConfig

Posted by xu <xu...@163.com>.
Hi Aljoscha:
    Thanks a lot for your advice.


    I think I have not need to separate steps, because what I do is only that introducing OperatorConfig and moving the fields. StreamConfig  still  relys on an underlying Configuration which flows from client to the jobmanager and then to the task.


    The following configs are used in an operator now:
    2) input serializer is used in AsyncWaitOperator.class
    5) chain.index is used in AbstractStreamOperator.setup(...)


    However, What I put in the OperatorConfig is all configs belong to the operator, contains not only the operator uses now, but also the streamTask uses to build an operator. By OperatorConfig, an operator can not see configs belong to others.


   Best Regards!
   JiPing

Re: refactor StreamConfig

Posted by Aljoscha Krettek <al...@apache.org>.
I think the proposed changed are good, I just wanted to make sure that they don’t interfere with what other people are doing.

I also proposed these steps on the Github PR:
Also, for actually doing the changes I suggest separate steps, i.e. separate commits. With possibly separate PRs to make reviewing easier and to make the changes more isolated:

 - Rename StreamConfig to StreamTaskConfig and make it serialisable, instead of relying on an underlying Configuration. This means that the StreamTaskConfig itself has fields for storing settings.
 - Introduce OperatorConfig and move only those fields that the operator should see from StreamTaskConfig to OperatorConfig. Initialize the operator with an OperatorConfig.

Regarding what to put in the OperatorConfig and what in the StreamTaskConfig: why are these still in the OperatorConfig?
       2)  streamOperator
       3)  input serializer.
       4)  output edges and serializers.
       5)  chain.index

I think only the StreamTask, that is responsible for building the OperatorChain needs to have that information.

Best,
Aljoscha


> On 4. Jul 2017, at 15:56, xu <xu...@163.com> wrote:
> 
> HI All:
>      I am sorry about working with StreamConfig(https://github.com/apache/flink/pull/4241) which may conflicts with others' work before discussing.
> 
>      Motivation:
>          A Task contains one or more operators with chainning, however configs of operator and task are all put in StreamConfig. For example, when an opeator is setup with the StreamConfig, it can see the interface about physicalEdges or chained.task.configs, which are confused.  Similarly a streamTask should not see the interface about chain.index.
>          So we need to separate OperatorConfig from StreamConfig. A streamTask inits with the streamConfig, and then extracts operatorConfigs from it, build streamOperators with every operatorConfig. 
> 
>     OperatorConfig:  for the streamOperator to setup with, it constains informations that only belong to the streamOperator. It contains:
>        1)  operator information: name, id
>        2)  streamOperator
>        3)  input serializer.
>        4)  output edges and serializers.
>        5)  chain.index
>        6)  state.key.serializer
> 
>      StreamConfig: for the streamTask to use:
>        1) in.physical.edges
>        2) out.physical.edges
>        3) chained OperatorConfigs
>        4) execution environment: checkpoint, state.backend and so on... 
>  
>     Proposed Change
>       I propose overall changes:
>        1) Builde jobGraph from streamGraph
>        2) StreamOperator is setup with a operatorConfig, so the setup interface need to change
> 
>     (1) Build jobGraph from streamGraph
>        When building, first we get every operatorConfig from the streamNode. And then put operatorConfigs of streamNodes to a streamConfig when we chain them to a jobVertex.
> 
>     (2) StreamOperator setup with OperatorProperties
>        An OperatorConfig is provided instead of streamConfig when the streamOperator sets up. Thanks to the advice of StephanEwan, OperatorConfig is no need to have a Map of "configKey" to values, just is a  serializable class with the respective fields, And StreamConfig still relys on an underlying Configuration, because the streamConfig flows by its underlying configuration.
>           
>       There are people who have already thought about this, maybe someone has been working on it. I need your advice.
> 
>       Thanks a lot for replying and Best Regards.
> 
>       JiPing
> 
> 
>  


Re: refactor StreamConfig (Appending a picture)

Posted by Ted Yu <yu...@gmail.com>.
The picture didn't go thru.

Please use third party site.

On Tue, Jul 4, 2017 at 7:09 AM, xu <xu...@163.com> wrote:

> I All:
>      I am sorry about working with StreamConfig(https://github.
> com/apache/flink/pull/4241) which may conflicts with others' work before
> discussing.
>
>      Motivation:
>          A Task contains one or more operators with chainning, however
> configs of operator and task are all put in StreamConfig. For example, when
> an opeator is setup with the StreamConfig, it can see the interface about
> physicalEdges or chained.task.configs, which are confused.  Similarly a
> streamTask should not see the interface about chain.index.
>          So we need to separate OperatorConfig from StreamConfig. A
> streamTask inits with the streamConfig, and then extracts operatorConfigs
> from it, build streamOperators with every operatorConfig.
>     OperatorConfig:  for the streamOperator to setup with, it constains
> informations that only belong to the streamOperator. It contains:
>        1)  operator information: name, id
>        2)  streamOperator
>        3)  input serializer.
>        4)  output edges and serializers.
>        5)  chain.index
>        6)  state.key.serializer
>
>      StreamConfig: for the streamTask to use:
>        1) in.physical.edges
>        2) out.physical.edges
>        3) chained OperatorConfigs
>        4) execution environment: checkpoint, state.backend and so on...
>
>     Proposed Change
>       I propose overall changes:
>        1) Builde jobGraph from streamGraph
>        2) StreamOperator is setup with a operatorConfig, so the setup
> interface need to change
>
>     (1) Build jobGraph from streamGraph
>        When building, first we get every operatorConfig from the
> streamNode. And then put operatorConfigs of streamNodes to a streamConfig
> when we chain them to a jobVertex.
>
>     (2) StreamOperator setup with OperatorProperties
>        An OperatorConfig is provided instead of streamConfig when the
> streamOperator sets up. Thanks to the advice of StephanEwan, OperatorConfig
> is no need to have a Map of "configKey" to values, just is a  serializable
> class with the respective fields, And StreamConfig still relys on an
> underlying Configuration, because the streamConfig flows by its underlying
> configuration.
>
>       There are people who have already thought about this, maybe someone
> has been working on it. I need your advice.
>
>       Thanks a lot for replying and Best Regards.
>
>       JiPing
>
>
>
>
>
>
>
>

Re:refactor StreamConfig (Appending a picture)

Posted by xu <xu...@163.com>.
I All:
     I am sorry about working with StreamConfig(https://github.com/apache/flink/pull/4241) which may conflicts with others' work before discussing.


     Motivation:
         A Task contains one or more operators with chainning, however configs of operator and task are all put in StreamConfig. For example, when an opeator is setup with the StreamConfig, it can see the interface about physicalEdges or chained.task.configs, which are confused.  Similarly a streamTask should not see the interface about chain.index.
         So we need to separate OperatorConfig from StreamConfig. A streamTask inits with the streamConfig, and then extracts operatorConfigs from it, build streamOperators with every operatorConfig. 
    OperatorConfig:  for the streamOperator to setup with, it constains informations that only belong to the streamOperator. It contains:
       1)  operator information: name, id
       2)  streamOperator
       3)  input serializer.
       4)  output edges and serializers.
       5)  chain.index
       6)  state.key.serializer


     StreamConfig: for the streamTask to use:
       1) in.physical.edges
       2) out.physical.edges
       3) chained OperatorConfigs
       4) execution environment: checkpoint, state.backend and so on... 
 
    Proposed Change
      I propose overall changes:
       1) Builde jobGraph from streamGraph
       2) StreamOperator is setup with a operatorConfig, so the setup interface need to change


    (1) Build jobGraph from streamGraph
       When building, first we get every operatorConfig from the streamNode. And then put operatorConfigs of streamNodes to a streamConfig when we chain them to a jobVertex.


    (2) StreamOperator setup with OperatorProperties
       An OperatorConfig is provided instead of streamConfig when the streamOperator sets up. Thanks to the advice of StephanEwan, OperatorConfig is no need to have a Map of "configKey" to values, just is a  serializable class with the respective fields, And StreamConfig still relys on an underlying Configuration, because the streamConfig flows by its underlying configuration.
          
      There are people who have already thought about this, maybe someone has been working on it. I need your advice.


      Thanks a lot for replying and Best Regards.


      JiPing