You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Qihua Yang <ya...@gmail.com> on 2022/01/19 07:25:25 UTC

JDBC read DB causeOutOfMemoryError: Java heap space

Hi,

I have a flink cluster(50 hosts, each host runs a task manager).
I am using Flink JDBC to consume data from a database. The db table is
pretty large, around 187340000 rows. I configured the JDBC number of
partitions to 50. fetchSize is 20. Flink application has 50 task managers.
Anyone know why I got OutOfMemoryError? How should I config it?

Thanks,
Qihua

Re: JDBC read DB causeOutOfMemoryError: Java heap space

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Sorry for the late reply. Which Flink version are you using? For current
Flink master there is no JdbcTableSource.

Qihua Yang <ya...@gmail.com> 于2022年1月19日周三 16:00写道:

> Should I change the query? something like below to add a limit? If no
> limit, does that mean flink will read whole huge table to memory and fetch
> and return 20 records each time?
>
> val query = String.format("SELECT * FROM %s limit 1000", tableName)
>
>
> On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <ya...@gmail.com> wrote:
>
>> Hi Caizhi,
>>
>> Thank you for your reply. The heap size is 512m. Fetching from the DB
>> table is the only costly operation. After fetching from DB, I simply
>> ingested a kafka topic. That should not be the bottleneck.
>> Here is the jdbc configuration. Is that correct config?
>>
>> val query = String.format("SELECT * FROM %s", tableName)
>>
>> val options = JdbcOptions.builder()
>>     .setDBUrl(url)
>>     .setTableName(tableName)
>>     .setDriverName(DRIVER_NAME)
>>     .setUsername(userName)
>>     .setPassword(password)
>>     .build()
>> val readOptions = JdbcReadOptions.builder()
>>     .setQuery(query)
>>     .setPartitionColumnName(PARTITION_KEY)
>>     .setPartitionLowerBound(dbLowerBound)
>>     .setPartitionUpperBound(dbUpperBound)
>>     .setNumPartitions(50)
>>     .setFetchSize(20)
>>     .build()
>> val lookupOptions = JdbcLookupOptions.builder()
>>     .setCacheMaxSize(-1)
>>     .setCacheExpireMs(1000)
>>     .setMaxRetryTimes(2)
>>     .build()
>> val rawSource = JdbcTableSource.builder()
>>     .setOptions(options)
>>     .setReadOptions(readOptions)
>>     .setLookupOptions(lookupOptions)
>>     .setSchema(schema)
>>     .build().getDataStream(env)
>>
>>
>> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <ts...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> This is not the desired behavior. As you have set fetchSize to 20 there
>>> will be only 20 records in each parallelism of the source. How large is
>>> your heap size? Does your job have any other operations which consume a lot
>>> of heap memory?
>>>
>>> Qihua Yang <ya...@gmail.com> 于2022年1月19日周三 15:27写道:
>>>
>>>> Here is the errors
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "server-timer"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>>>> Exception: java.lang.OutOfMemoryError thrown from the
>>>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>>>
>>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <ya...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>>>> I am using Flink JDBC to consume data from a database. The db table is
>>>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>

Re: JDBC read DB causeOutOfMemoryError: Java heap space

Posted by Qihua Yang <ya...@gmail.com>.
Should I change the query? something like below to add a limit? If no
limit, does that mean flink will read whole huge table to memory and fetch
and return 20 records each time?

val query = String.format("SELECT * FROM %s limit 1000", tableName)


On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <ya...@gmail.com> wrote:

> Hi Caizhi,
>
> Thank you for your reply. The heap size is 512m. Fetching from the DB
> table is the only costly operation. After fetching from DB, I simply
> ingested a kafka topic. That should not be the bottleneck.
> Here is the jdbc configuration. Is that correct config?
>
> val query = String.format("SELECT * FROM %s", tableName)
>
> val options = JdbcOptions.builder()
>     .setDBUrl(url)
>     .setTableName(tableName)
>     .setDriverName(DRIVER_NAME)
>     .setUsername(userName)
>     .setPassword(password)
>     .build()
> val readOptions = JdbcReadOptions.builder()
>     .setQuery(query)
>     .setPartitionColumnName(PARTITION_KEY)
>     .setPartitionLowerBound(dbLowerBound)
>     .setPartitionUpperBound(dbUpperBound)
>     .setNumPartitions(50)
>     .setFetchSize(20)
>     .build()
> val lookupOptions = JdbcLookupOptions.builder()
>     .setCacheMaxSize(-1)
>     .setCacheExpireMs(1000)
>     .setMaxRetryTimes(2)
>     .build()
> val rawSource = JdbcTableSource.builder()
>     .setOptions(options)
>     .setReadOptions(readOptions)
>     .setLookupOptions(lookupOptions)
>     .setSchema(schema)
>     .build().getDataStream(env)
>
>
> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> This is not the desired behavior. As you have set fetchSize to 20 there
>> will be only 20 records in each parallelism of the source. How large is
>> your heap size? Does your job have any other operations which consume a lot
>> of heap memory?
>>
>> Qihua Yang <ya...@gmail.com> 于2022年1月19日周三 15:27写道:
>>
>>> Here is the errors
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "server-timer"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>>
>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <ya...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>>> I am using Flink JDBC to consume data from a database. The db table is
>>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>>

Re: JDBC read DB causeOutOfMemoryError: Java heap space

Posted by Qihua Yang <ya...@gmail.com>.
Hi Caizhi,

Thank you for your reply. The heap size is 512m. Fetching from the DB table
is the only costly operation. After fetching from DB, I simply ingested a
kafka topic. That should not be the bottleneck.
Here is the jdbc configuration. Is that correct config?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
    .setDBUrl(url)
    .setTableName(tableName)
    .setDriverName(DRIVER_NAME)
    .setUsername(userName)
    .setPassword(password)
    .build()
val readOptions = JdbcReadOptions.builder()
    .setQuery(query)
    .setPartitionColumnName(PARTITION_KEY)
    .setPartitionLowerBound(dbLowerBound)
    .setPartitionUpperBound(dbUpperBound)
    .setNumPartitions(50)
    .setFetchSize(20)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(1000)
    .setMaxRetryTimes(2)
    .build()
val rawSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(schema)
    .build().getDataStream(env)


On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> This is not the desired behavior. As you have set fetchSize to 20 there
> will be only 20 records in each parallelism of the source. How large is
> your heap size? Does your job have any other operations which consume a lot
> of heap memory?
>
> Qihua Yang <ya...@gmail.com> 于2022年1月19日周三 15:27写道:
>
>> Here is the errors
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "server-timer"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>
>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <ya...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>> I am using Flink JDBC to consume data from a database. The db table is
>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>

Re: JDBC read DB causeOutOfMemoryError: Java heap space

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

This is not the desired behavior. As you have set fetchSize to 20 there
will be only 20 records in each parallelism of the source. How large is
your heap size? Does your job have any other operations which consume a lot
of heap memory?

Qihua Yang <ya...@gmail.com> 于2022年1月19日周三 15:27写道:

> Here is the errors
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "server-timer"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "I/O dispatcher 16"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "I/O dispatcher 11"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>
> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <ya...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a flink cluster(50 hosts, each host runs a task manager).
>> I am using Flink JDBC to consume data from a database. The db table is
>> pretty large, around 187340000 rows. I configured the JDBC number of
>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>> Anyone know why I got OutOfMemoryError? How should I config it?
>>
>> Thanks,
>> Qihua
>>
>>

Re: JDBC read DB causeOutOfMemoryError: Java heap space

Posted by Qihua Yang <ya...@gmail.com>.
Here is the errors
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "server-timer"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 16"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "HTTP-Dispatcher"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 11"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 9"

On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <ya...@gmail.com> wrote:

> Hi,
>
> I have a flink cluster(50 hosts, each host runs a task manager).
> I am using Flink JDBC to consume data from a database. The db table is
> pretty large, around 187340000 rows. I configured the JDBC number of
> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
> Anyone know why I got OutOfMemoryError? How should I config it?
>
> Thanks,
> Qihua
>
>