You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tobias Fröhlich <to...@scoop-software.de> on 2023/02/14 13:26:34 UTC
Metrics or runtimeContext in global commit
Dear flink team,
I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.
The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.
The only solution I found was by cloning the flink source code and amending it in the following way:
1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"
2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:
if (committer instanceof CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext<CommT>) committer).init();
}
I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.
Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?
Best regards and thanks in advance
Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
Posted by Tobias Fröhlich <to...@scoop-software.de>.
Dear Yuxia,
thank you for your answer! This is also our conclusion and my colleague has already proposed this feature.
Best regards,
Tobias
----- Ursprüngliche Mail -----
Von: "yuxia" <lu...@alumni.sjtu.edu.cn>
An: "Dr. Tobias Fröhlich" <to...@scoop-software.de>
CC: "User" <us...@flink.apache.org>, "dev" <de...@flink.apache.org>
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature.
I added flink-devs channel for more attention/discussion in flink devs.
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit
Dear flink team,
I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.
The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.
The only solution I found was by cloning the flink source code and amending it in the following way:
1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"
2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:
if (committer instanceof CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext<CommT>) committer).init();
}
I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.
Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?
Best regards and thanks in advance
Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
Posted by Tobias Fröhlich <to...@scoop-software.de>.
Dear Yuxia,
thank you for your answer! This is also our conclusion and my colleague has already proposed this feature.
Best regards,
Tobias
----- Ursprüngliche Mail -----
Von: "yuxia" <lu...@alumni.sjtu.edu.cn>
An: "Dr. Tobias Fröhlich" <to...@scoop-software.de>
CC: "User" <us...@flink.apache.org>, "dev" <de...@flink.apache.org>
Gesendet: Montag, 20. Februar 2023 03:31:22
Betreff: Re: Metrics or runtimeContext in global commit
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature.
I added flink-devs channel for more attention/discussion in flink devs.
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit
Dear flink team,
I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.
The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.
The only solution I found was by cloning the flink source code and amending it in the following way:
1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"
2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:
if (committer instanceof CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext<CommT>) committer).init();
}
I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.
Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?
Best regards and thanks in advance
Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature.
I added flink-devs channel for more attention/discussion in flink devs.
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit
Dear flink team,
I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.
The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.
The only solution I found was by cloning the flink source code and amending it in the following way:
1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"
2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:
if (committer instanceof CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext<CommT>) committer).init();
}
I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.
Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?
Best regards and thanks in advance
Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature.
I added flink-devs channel for more attention/discussion in flink devs.
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Tobias Fröhlich" <to...@scoop-software.de>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit
Dear flink team,
I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection<CommitRequest<CommT>> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology.
The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator.
The only solution I found was by cloning the flink source code and amending it in the following way:
1. declaring an abstract class "CommitterWithRuntimeContext<CommT>" that implements Committer<CommT> and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"
2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end:
if (committer instanceof CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext<CommT>) committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext<CommT>) committer).init();
}
I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext.
Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext?
Best regards and thanks in advance
Tobias Fröhlich