You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by neha goyal <ne...@gmail.com> on 2023/04/19 05:31:38 UTC

Flink not releasing the reference to a deleted jar file

Hello,

I am attaching a sample code and screenshot where Flink is holding the
reference to a jar file even after I close the streamExecutionEnvironment.

Due to this, the deleted file is not getting cleaned up from the disk and
we are getting disc space alerts. When we restart our application then
these files get cleared from the disk.
What is the way to gracefully shut down the Flink environment so that it
releases all the resources' references?

public class TestResourceRelease {

    public void check(){
        StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        try{
            StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
            env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate
AS 'com.my.udf.v2.EpochToDate' USING JAR
'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
            TypeInformation[] typeInformationArray = getTypeInfoArray();
            String[] columnName = new String[]{"x", "y"};
            KafkaSource<Row> source =
KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new
JsonRowDeserializationSchema(Types.ROW_NAMED(columnName,
typeInformationArray)))
                    .setProperty("bootstrap.servers", "localhost:9092")
                    .setTopics("test").build();

            DataStream<Row> stream = execEnv.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
            env.registerDataStream("test", stream);

            Table table = env.fromDataStream(stream);
            env.registerTable("my_test", table);
            Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS
`order_time`, `y` FROM `my_test`");
            System.out.println("created the table");
        }
        catch (Exception e){
            System.out.println(e);

        }
        finally {
            try {
                execEnv.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            File file = new
File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
            file.delete();
        }

    }

    public static TypeInformation[] getTypeInfoArray(){
        TypeInformation[] typeInformations = new TypeInformation[2];
        typeInformations[0] = org.apache.flink.table.api.Types.LONG();
        typeInformations[1] = org.apache.flink.table.api.Types.LONG();
        return typeInformations;
    }

}

Re: Flink not releasing the reference to a deleted jar file

Posted by neha goyal <ne...@gmail.com>.
Hi Shammon,

The Flink job doesn't exist after I close the execution environment right?
Can you please try the attached code and see that I am not sharing the file
with any other job? Until I close the running Java application the file
still has an open reference in the code mentioned.

On Thu, Apr 20, 2023 at 7:25 AM Shammon FY <zj...@gmail.com> wrote:

> Hi neha
>
> Flink can delete runtime data for a job when it goes to termination. But
> for external files such as udf jar files as you mentioned, I think you need
> to manage them yourself. The files may be shared between jobs, and can not
> be deleted when one flink job exists.
>
> Best,
> Shammon FY
>
>
> On Wed, Apr 19, 2023 at 1:37 PM neha goyal <ne...@gmail.com> wrote:
>
>> Adding to the above query, I have tried dropping the tables and the
>> function as well but no luck.
>>
>> On Wed, Apr 19, 2023 at 11:01 AM neha goyal <ne...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am attaching a sample code and screenshot where Flink is holding the
>>> reference to a jar file even after I close the streamExecutionEnvironment.
>>>
>>> Due to this, the deleted file is not getting cleaned up from the disk
>>> and we are getting disc space alerts. When we restart our application then
>>> these files get cleared from the disk.
>>> What is the way to gracefully shut down the Flink environment so that it
>>> releases all the resources' references?
>>>
>>> public class TestResourceRelease {
>>>
>>>     public void check(){
>>>         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>         try{
>>>             StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
>>>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 'com.my.udf.v2.EpochToDate' USING JAR 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>>>             String[] columnName = new String[]{"x", "y"};
>>>             KafkaSource<Row> source = KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>>>                     .setValueOnlyDeserializer(new JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, typeInformationArray)))
>>>                     .setProperty("bootstrap.servers", "localhost:9092")
>>>                     .setTopics("test").build();
>>>
>>>             DataStream<Row> stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
>>>             env.registerDataStream("test", stream);
>>>
>>>             Table table = env.fromDataStream(stream);
>>>             env.registerTable("my_test", table);
>>>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS `order_time`, `y` FROM `my_test`");
>>>             System.out.println("created the table");
>>>         }
>>>         catch (Exception e){
>>>             System.out.println(e);
>>>
>>>         }
>>>         finally {
>>>             try {
>>>                 execEnv.close();
>>>             } catch (Exception e) {
>>>                 e.printStackTrace();
>>>             }
>>>             File file = new File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>>             file.delete();
>>>         }
>>>
>>>     }
>>>
>>>     public static TypeInformation[] getTypeInfoArray(){
>>>         TypeInformation[] typeInformations = new TypeInformation[2];
>>>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>>         return typeInformations;
>>>     }
>>>
>>> }
>>>
>>>

Re: Flink not releasing the reference to a deleted jar file

Posted by Shammon FY <zj...@gmail.com>.
Hi neha

Flink can delete runtime data for a job when it goes to termination. But
for external files such as udf jar files as you mentioned, I think you need
to manage them yourself. The files may be shared between jobs, and can not
be deleted when one flink job exists.

Best,
Shammon FY


On Wed, Apr 19, 2023 at 1:37 PM neha goyal <ne...@gmail.com> wrote:

> Adding to the above query, I have tried dropping the tables and the
> function as well but no luck.
>
> On Wed, Apr 19, 2023 at 11:01 AM neha goyal <ne...@gmail.com> wrote:
>
>> Hello,
>>
>> I am attaching a sample code and screenshot where Flink is holding the
>> reference to a jar file even after I close the streamExecutionEnvironment.
>>
>> Due to this, the deleted file is not getting cleaned up from the disk and
>> we are getting disc space alerts. When we restart our application then
>> these files get cleared from the disk.
>> What is the way to gracefully shut down the Flink environment so that it
>> releases all the resources' references?
>>
>> public class TestResourceRelease {
>>
>>     public void check(){
>>         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>         try{
>>             StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
>>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 'com.my.udf.v2.EpochToDate' USING JAR 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>>             String[] columnName = new String[]{"x", "y"};
>>             KafkaSource<Row> source = KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>>                     .setValueOnlyDeserializer(new JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, typeInformationArray)))
>>                     .setProperty("bootstrap.servers", "localhost:9092")
>>                     .setTopics("test").build();
>>
>>             DataStream<Row> stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
>>             env.registerDataStream("test", stream);
>>
>>             Table table = env.fromDataStream(stream);
>>             env.registerTable("my_test", table);
>>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS `order_time`, `y` FROM `my_test`");
>>             System.out.println("created the table");
>>         }
>>         catch (Exception e){
>>             System.out.println(e);
>>
>>         }
>>         finally {
>>             try {
>>                 execEnv.close();
>>             } catch (Exception e) {
>>                 e.printStackTrace();
>>             }
>>             File file = new File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>             file.delete();
>>         }
>>
>>     }
>>
>>     public static TypeInformation[] getTypeInfoArray(){
>>         TypeInformation[] typeInformations = new TypeInformation[2];
>>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>         return typeInformations;
>>     }
>>
>> }
>>
>>

Re: Flink not releasing the reference to a deleted jar file

Posted by neha goyal <ne...@gmail.com>.
Adding to the above query, I have tried dropping the tables and the
function as well but no luck.

On Wed, Apr 19, 2023 at 11:01 AM neha goyal <ne...@gmail.com> wrote:

> Hello,
>
> I am attaching a sample code and screenshot where Flink is holding the
> reference to a jar file even after I close the streamExecutionEnvironment.
>
> Due to this, the deleted file is not getting cleaned up from the disk and
> we are getting disc space alerts. When we restart our application then
> these files get cleared from the disk.
> What is the way to gracefully shut down the Flink environment so that it
> releases all the resources' references?
>
> public class TestResourceRelease {
>
>     public void check(){
>         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         try{
>             StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 'com.my.udf.v2.EpochToDate' USING JAR 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>             String[] columnName = new String[]{"x", "y"};
>             KafkaSource<Row> source = KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>                     .setValueOnlyDeserializer(new JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, typeInformationArray)))
>                     .setProperty("bootstrap.servers", "localhost:9092")
>                     .setTopics("test").build();
>
>             DataStream<Row> stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
>             env.registerDataStream("test", stream);
>
>             Table table = env.fromDataStream(stream);
>             env.registerTable("my_test", table);
>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS `order_time`, `y` FROM `my_test`");
>             System.out.println("created the table");
>         }
>         catch (Exception e){
>             System.out.println(e);
>
>         }
>         finally {
>             try {
>                 execEnv.close();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>             File file = new File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>             file.delete();
>         }
>
>     }
>
>     public static TypeInformation[] getTypeInfoArray(){
>         TypeInformation[] typeInformations = new TypeInformation[2];
>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>         return typeInformations;
>     }
>
> }
>
>