You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tobias Fröhlich <to...@scoop-software.de> on 2023/02/14 13:26:34 UTC

Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.

The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich

Re: Metrics or runtimeContext in global commit

Posted by Tobias Fröhlich <to...@scoop-software.de>.
Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has already proposed this feature.

Best regards,
Tobias


----- Ursprüngliche Mail -----
Von: "yuxia" <lu...@alumni.sjtu.edu.cn>
An: "Dr. Tobias Fröhlich" <to...@scoop-software.de>
CC: "User" <us...@flink.apache.org>, "dev" <de...@flink.apache.org>
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.

The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich

Re: Metrics or runtimeContext in global commit

Posted by Tobias Fröhlich <to...@scoop-software.de>.
Dear Yuxia,

thank you for your answer! This is also our conclusion and my colleague has already proposed this feature.

Best regards,
Tobias


----- Ursprüngliche Mail -----
Von: "yuxia" <lu...@alumni.sjtu.edu.cn>
An: "Dr. Tobias Fröhlich" <to...@scoop-software.de>
CC: "User" <us...@flink.apache.org>, "dev" <de...@flink.apache.org>
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit

It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.

The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich

Re: Metrics or runtimeContext in global commit

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.

The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich

Re: Metrics or runtimeContext in global commit

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.

The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
    - an additional field for the runtimeContext
    - setter and getter for this field
    - an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:

    if (committer instanceof  CommitterWithRuntimeContext) {
        ((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
        ((CommitterWithRuntimeContext<CommT>) committer).init();
    }

I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich