You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Bruce Zu <zu...@gmail.com> on 2022/07/12 03:42:08 UTC

A common question on how to release resource in Flink job

Hi Fink team,
Excuse me, I have a common question about how to release the resource held
by some object in Flink Job.

Thank you in advance!. I am a new user of Flink. I search a lot but did not
find related documents. But I am sure there is a standard way to resolve it.

In our case, we need access Elasticsearch server. And we use a class
EsClient extends from `org.elasticsearch.client.RestHighLevelClient` to
does query work,
which requires calling its `close` method once it is not used anymore to
release resources.

So I need to find the right place to do that to make sure the resource
always can be released, even if some exceptions happen somewhere in the
invoked
method of the main class or other class.

What I can come up with now is using a `ThreadLocal` to keep the EsClient
object at the start of the main class’s main method and
release the resource at the end of the main method.

The pseudocode (Java)  like this:
```java
public class EsClientHolder {
  private static final ThreadLocal<EsClient> local = new
InheritableThreadLocal<>();

  public static final void createAndSetEsClient(EsClient esClient) {
    local.set(esClient);
  }

  public static final void createAndSetEsClientBy(EsClientConfig
esClientConfig) {
    EsClient instance = new EsClient();
    local.set(instance);
  }

  public static final EsClient get() {
    EsClient c = local.get();
    if (c == null) {
      throw new RuntimeException("Make sure to create and set the EsClient
instance before use it");
    }
    return c;
  }

  public static final void close() throws IOException {
    EsClient o = local.get();
    if (o != null) {
      o.close();
    }
  }

// usage in Fink application code
  public class MainClass {
    public static void main(String[] args) throws IOException {
      try {
        Properties prop = null;
        EsClientConfig config = getEsClientConfig(prop);
        EsClientHolder.createAndSetEsClientBy(config);
       // …
       SomeClass.method1();
       OtherClass.method2();
       // ...
      } finally {
        EsClientHolder.close();
      }
    }
  }

class SomeClass{
   public void.  method1(){
        // 1. use the EsClient in any invoked method of any other class:
        EsClient esClient = EsClientHolder.get();
       // …
   }
}
class OtherClass{
  public void method2(){
      // 2. use the EsClient in any invoked method of any forked child
thread
        new Thread(
                () -> {
                  EsClient client = EsClientHolder.get();
                  // …
                })
            .start();
         // …
  }
}

```

I understand that TaskManager is a Java JVM process and Task is executed by
a java Thread.

But I do not know how Flink creates a job graph and eventually how Flink
allocates the tasks to threads and the relationship within these threads.

For example, if Flink cut the method1 of  SomeClass and method2 of
OtherClass  into another thread, not the same as the thread running the
MainClass
Then the thread running method1 and mehod2 has no way to get the EsClient.
Here I assume the main method in the MainClass will be executed in one
thread. If it is not, the set() and close() are split to run in
different threads, then there is
no way to release the resource.

Thank you!