You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Qihua Yang <ya...@gmail.com> on 2022/05/03 16:24:22 UTC

flink task manager memory leak

Hi,

We have a flink application that consumes from a cockroach DB table and
sinks to a kafka topic. That flink application runs in batch mode. We
submit a job to read cockroach DB every 6 hour. Flink uses JDBC connector
to read the DB table. After it reads the whole table, the job is done.
Now from the graph we observe JVM Memory Heap level up after the job is
done. After I redeploy the flink application, memory back to normal. But if
I don't redeploy application, JVM Memory Heap will keep leveling up.
Is that a memory leak? How to solve it?

Below graph is for metric flink_taskmanager_Status_JVM_Memory_Heap_Used
The big spike is the job that consuming from cockroach DB table.

[image: Screen Shot 2022-05-03 at 9.09.36 AM.png]

Here is how we config JDBC connector

val options = JdbcOptions.builder()
    .setDBUrl(url)
    .setTableName(tableName)
    .setDriverName(DRIVER_NAME)
    .setUsername(userName)
    .setPassword(password)
    .build()
val readOptions = JdbcReadOptions.builder()
    .setQuery(query)
    .setPartitionColumnName(partitionKey)
    .setPartitionLowerBound(dbLowerBound)
    .setPartitionUpperBound(.dbUpperBound)
    .setNumPartitions(partitionNum)
    .setFetchSize(BATCH_SIZE)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(CACHE_SIZE)
    .setMaxRetryTimes(2)
    .build()
val rawSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(schema)
    .build().getDataStream(env)

Re: flink task manager memory leak

Posted by Martijn Visser <ma...@apache.org>.
Hi,

Since Flink doesn't have a JDBC implementation for CockroachDB, have you
built your custom implementation for that?

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Wed, 4 May 2022 at 10:38, Flavio Pompermaier <po...@okkam.it>
wrote:

> Have you put the db connector library in the flink libs dir? Or is it in
> the application jar?
> This usually happens if the db driver lib is within the application jar
> and not in the flink libs dir
>
> Best,
> Flavio
>
> On Tue, May 3, 2022 at 7:32 PM Qihua Yang <ya...@gmail.com> wrote:
>
>> Here is the graph of single taskManager.
>> [image: Screen Shot 2022-05-03 at 10.31.01 AM.png]
>>
>>
>> On Tue, May 3, 2022 at 9:24 AM Qihua Yang <ya...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have a flink application that consumes from a cockroach DB table and
>>> sinks to a kafka topic. That flink application runs in batch mode. We
>>> submit a job to read cockroach DB every 6 hour. Flink uses JDBC connector
>>> to read the DB table. After it reads the whole table, the job is done.
>>> Now from the graph we observe JVM Memory Heap level up after the job is
>>> done. After I redeploy the flink application, memory back to normal. But if
>>> I don't redeploy application, JVM Memory Heap will keep leveling up.
>>> Is that a memory leak? How to solve it?
>>>
>>> Below graph is for metric flink_taskmanager_Status_JVM_Memory_Heap_Used
>>> The big spike is the job that consuming from cockroach DB table.
>>>
>>> [image: Screen Shot 2022-05-03 at 9.09.36 AM.png]
>>>
>>> Here is how we config JDBC connector
>>>
>>> val options = JdbcOptions.builder()
>>>     .setDBUrl(url)
>>>     .setTableName(tableName)
>>>     .setDriverName(DRIVER_NAME)
>>>     .setUsername(userName)
>>>     .setPassword(password)
>>>     .build()
>>> val readOptions = JdbcReadOptions.builder()
>>>     .setQuery(query)
>>>     .setPartitionColumnName(partitionKey)
>>>     .setPartitionLowerBound(dbLowerBound)
>>>     .setPartitionUpperBound(.dbUpperBound)
>>>     .setNumPartitions(partitionNum)
>>>     .setFetchSize(BATCH_SIZE)
>>>     .build()
>>> val lookupOptions = JdbcLookupOptions.builder()
>>>     .setCacheMaxSize(-1)
>>>     .setCacheExpireMs(CACHE_SIZE)
>>>     .setMaxRetryTimes(2)
>>>     .build()
>>> val rawSource = JdbcTableSource.builder()
>>>     .setOptions(options)
>>>     .setReadOptions(readOptions)
>>>     .setLookupOptions(lookupOptions)
>>>     .setSchema(schema)
>>>     .build().getDataStream(env)
>>>
>>>
>

Re: flink task manager memory leak

Posted by Flavio Pompermaier <po...@okkam.it>.
Have you put the db connector library in the flink libs dir? Or is it in
the application jar?
This usually happens if the db driver lib is within the application jar and
not in the flink libs dir

Best,
Flavio

On Tue, May 3, 2022 at 7:32 PM Qihua Yang <ya...@gmail.com> wrote:

> Here is the graph of single taskManager.
> [image: Screen Shot 2022-05-03 at 10.31.01 AM.png]
>
>
> On Tue, May 3, 2022 at 9:24 AM Qihua Yang <ya...@gmail.com> wrote:
>
>> Hi,
>>
>> We have a flink application that consumes from a cockroach DB table and
>> sinks to a kafka topic. That flink application runs in batch mode. We
>> submit a job to read cockroach DB every 6 hour. Flink uses JDBC connector
>> to read the DB table. After it reads the whole table, the job is done.
>> Now from the graph we observe JVM Memory Heap level up after the job is
>> done. After I redeploy the flink application, memory back to normal. But if
>> I don't redeploy application, JVM Memory Heap will keep leveling up.
>> Is that a memory leak? How to solve it?
>>
>> Below graph is for metric flink_taskmanager_Status_JVM_Memory_Heap_Used
>> The big spike is the job that consuming from cockroach DB table.
>>
>> [image: Screen Shot 2022-05-03 at 9.09.36 AM.png]
>>
>> Here is how we config JDBC connector
>>
>> val options = JdbcOptions.builder()
>>     .setDBUrl(url)
>>     .setTableName(tableName)
>>     .setDriverName(DRIVER_NAME)
>>     .setUsername(userName)
>>     .setPassword(password)
>>     .build()
>> val readOptions = JdbcReadOptions.builder()
>>     .setQuery(query)
>>     .setPartitionColumnName(partitionKey)
>>     .setPartitionLowerBound(dbLowerBound)
>>     .setPartitionUpperBound(.dbUpperBound)
>>     .setNumPartitions(partitionNum)
>>     .setFetchSize(BATCH_SIZE)
>>     .build()
>> val lookupOptions = JdbcLookupOptions.builder()
>>     .setCacheMaxSize(-1)
>>     .setCacheExpireMs(CACHE_SIZE)
>>     .setMaxRetryTimes(2)
>>     .build()
>> val rawSource = JdbcTableSource.builder()
>>     .setOptions(options)
>>     .setReadOptions(readOptions)
>>     .setLookupOptions(lookupOptions)
>>     .setSchema(schema)
>>     .build().getDataStream(env)
>>
>>

Re: flink task manager memory leak

Posted by Qihua Yang <ya...@gmail.com>.
Here is the graph of single taskManager.
[image: Screen Shot 2022-05-03 at 10.31.01 AM.png]


On Tue, May 3, 2022 at 9:24 AM Qihua Yang <ya...@gmail.com> wrote:

> Hi,
>
> We have a flink application that consumes from a cockroach DB table and
> sinks to a kafka topic. That flink application runs in batch mode. We
> submit a job to read cockroach DB every 6 hour. Flink uses JDBC connector
> to read the DB table. After it reads the whole table, the job is done.
> Now from the graph we observe JVM Memory Heap level up after the job is
> done. After I redeploy the flink application, memory back to normal. But if
> I don't redeploy application, JVM Memory Heap will keep leveling up.
> Is that a memory leak? How to solve it?
>
> Below graph is for metric flink_taskmanager_Status_JVM_Memory_Heap_Used
> The big spike is the job that consuming from cockroach DB table.
>
> [image: Screen Shot 2022-05-03 at 9.09.36 AM.png]
>
> Here is how we config JDBC connector
>
> val options = JdbcOptions.builder()
>     .setDBUrl(url)
>     .setTableName(tableName)
>     .setDriverName(DRIVER_NAME)
>     .setUsername(userName)
>     .setPassword(password)
>     .build()
> val readOptions = JdbcReadOptions.builder()
>     .setQuery(query)
>     .setPartitionColumnName(partitionKey)
>     .setPartitionLowerBound(dbLowerBound)
>     .setPartitionUpperBound(.dbUpperBound)
>     .setNumPartitions(partitionNum)
>     .setFetchSize(BATCH_SIZE)
>     .build()
> val lookupOptions = JdbcLookupOptions.builder()
>     .setCacheMaxSize(-1)
>     .setCacheExpireMs(CACHE_SIZE)
>     .setMaxRetryTimes(2)
>     .build()
> val rawSource = JdbcTableSource.builder()
>     .setOptions(options)
>     .setReadOptions(readOptions)
>     .setLookupOptions(lookupOptions)
>     .setSchema(schema)
>     .build().getDataStream(env)
>
>