You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Bruce Zu <zu...@gmail.com> on 2022/07/12 04:35:31 UTC
请教:关于如何释放 Flink Job 中某个对象持有的资源
Flink team好,
我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
一旦不再使用它就需要调用它的`close`方法来释放资源。
所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
在 main 方法结束时释放资源。
类似这样的伪代码:
```java
公共类 EsClientHolder {
private static final ThreadLocal<EsClient> local = new
InheritableThreadLocal<>();
public static final void createAndSetEsClient(EsClient esClient){
local.set(esClient);
}
private static final createAndSetEsClientBy(EsClientConfig
esClientConfig){
EsClient instance = new EsClient(esClientConfig);
createAndSetEsClient(instance) ;
}
private static final EsClient get() {
EsClient c = local.get();
if(c == null){
throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
}
return c;
}
private static final close()抛出 IOException {
EsClient o = local.get();
if(o!= null){
o.close();
}
}
// 在 Fink 应用程序代码中的用法
public class main class {
public static void main(String[] args) throws IOException {
try {
property prop = null;
EsClientConfig configuration = getEsClientConfig(prop);
EsClientHolder.createAndSetEsClientBy(config);
// …
SomeClass.method1();
other classes.method2();
// ...
} at last {
EsClientHolder.close();
}
}
}
class SomeClass{
public void. method 1(){
// 1. Use EsClient in any calling method of any other class:
EsClient esClient = EsClientHolder.get();
// …
}
}
class other class {
public void method 2() {
// 2. Use EsClient in any calling method of any forked child thread
new thread (
() -> {
EsClient client = EsClientHolder.get();
// …
})
. start();
// …
}
}
```
我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
的线程不一样的线程,
那么运行method1和mehod2的线程就没有办法拿到EsClient了。
这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
没有办法释放资源。
谢谢!
Re: 请教:关于如何释放 Flink Job 中某个对象持有的资源
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
不建议在 TM 内部多个 Task 间共享变量,每个 Task 单独使用自己的资源,在 RichFunction open 时初始化资源,close
时释放资源。否则容易导致资源泄露
Best,
Weihua
On Tue, Jul 12, 2022 at 2:31 PM RS <ti...@163.com> wrote:
> Hi,
>
>
> 如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法
>
>
>
> 资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,
>
>
> Thanks
>
>
> 在 2022-07-12 12:35:31,"Bruce Zu" <zu...@gmail.com> 写道:
> > Flink team好,
> > 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
> >
> > 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
> >
> >我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
> >org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
> >一旦不再使用它就需要调用它的`close`方法来释放资源。
> >
> >所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
> >
> >我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
> >在 main 方法结束时释放资源。
> >
> >类似这样的伪代码:
> >```java
> >公共类 EsClientHolder {
> > private static final ThreadLocal<EsClient> local = new
> >InheritableThreadLocal<>();
> >
> > public static final void createAndSetEsClient(EsClient esClient){
> > local.set(esClient);
> > }
> >
> > private static final createAndSetEsClientBy(EsClientConfig
> >esClientConfig){
> > EsClient instance = new EsClient(esClientConfig);
> > createAndSetEsClient(instance) ;
> > }
> >
> > private static final EsClient get() {
> > EsClient c = local.get();
> > if(c == null){
> > throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
> > }
> > return c;
> > }
> >
> > private static final close()抛出 IOException {
> > EsClient o = local.get();
> > if(o!= null){
> > o.close();
> > }
> > }
> >
> >// 在 Fink 应用程序代码中的用法
> > public class main class {
> > public static void main(String[] args) throws IOException {
> > try {
> > property prop = null;
> > EsClientConfig configuration = getEsClientConfig(prop);
> > EsClientHolder.createAndSetEsClientBy(config);
> > // …
> > SomeClass.method1();
> > other classes.method2();
> > // ...
> > } at last {
> > EsClientHolder.close();
> > }
> > }
> > }
> >
> >class SomeClass{
> > public void. method 1(){
> > // 1. Use EsClient in any calling method of any other class:
> > EsClient esClient = EsClientHolder.get();
> > // …
> > }
> >}
> >class other class {
> > public void method 2() {
> > // 2. Use EsClient in any calling method of any forked child thread
> > new thread (
> > () -> {
> > EsClient client = EsClientHolder.get();
> > // …
> > })
> > . start();
> > // …
> > }
> >}
> >
> >```
> >
> >我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
> >
> >但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
> >
> >比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
> >的线程不一样的线程,
> >那么运行method1和mehod2的线程就没有办法拿到EsClient了。
> >这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close()
> 拆分为在不同的线程中运行,则就
> >没有办法释放资源。
> >
> >谢谢!
>
Re:请教:关于如何释放 Flink Job 中某个对象持有的资源
Posted by RS <ti...@163.com>.
Hi,
如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法
资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,
Thanks
在 2022-07-12 12:35:31,"Bruce Zu" <zu...@gmail.com> 写道:
> Flink team好,
> 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
>
> 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
>
>我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
>org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
>一旦不再使用它就需要调用它的`close`方法来释放资源。
>
>所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
>
>我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
>在 main 方法结束时释放资源。
>
>类似这样的伪代码:
>```java
>公共类 EsClientHolder {
> private static final ThreadLocal<EsClient> local = new
>InheritableThreadLocal<>();
>
> public static final void createAndSetEsClient(EsClient esClient){
> local.set(esClient);
> }
>
> private static final createAndSetEsClientBy(EsClientConfig
>esClientConfig){
> EsClient instance = new EsClient(esClientConfig);
> createAndSetEsClient(instance) ;
> }
>
> private static final EsClient get() {
> EsClient c = local.get();
> if(c == null){
> throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
> }
> return c;
> }
>
> private static final close()抛出 IOException {
> EsClient o = local.get();
> if(o!= null){
> o.close();
> }
> }
>
>// 在 Fink 应用程序代码中的用法
> public class main class {
> public static void main(String[] args) throws IOException {
> try {
> property prop = null;
> EsClientConfig configuration = getEsClientConfig(prop);
> EsClientHolder.createAndSetEsClientBy(config);
> // …
> SomeClass.method1();
> other classes.method2();
> // ...
> } at last {
> EsClientHolder.close();
> }
> }
> }
>
>class SomeClass{
> public void. method 1(){
> // 1. Use EsClient in any calling method of any other class:
> EsClient esClient = EsClientHolder.get();
> // …
> }
>}
>class other class {
> public void method 2() {
> // 2. Use EsClient in any calling method of any forked child thread
> new thread (
> () -> {
> EsClient client = EsClientHolder.get();
> // …
> })
> . start();
> // …
> }
>}
>
>```
>
>我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
>
>但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
>
>比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
>的线程不一样的线程,
>那么运行method1和mehod2的线程就没有办法拿到EsClient了。
>这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
>没有办法释放资源。
>
>谢谢!