You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Qihua Yang <ya...@gmail.com> on 2022/02/08 21:08:48 UTC

Flink JDBC connector behavior

Hi,

We are using flink jdbc connector to read whole database table line by
line. A few things I don't quite understand.
We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
What is the flink internal behavior to read data from table?
Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM)
data each time? Or it read whole table into memory each time?
database metrics show the sql latency is extremely high, almost 20s.
is there any way to optimize it?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
    .setDBUrl(url)
    .setTableName(tableName)
    .setDriverName(DRIVER_NAME)
    .setUsername(userName)
    .setPassword(password)
    .build()
val readOptions = JdbcReadOptions.builder()
    .setQuery(query)
    .setPartitionColumnName(PARTITION_KEY)
    .setPartitionLowerBound(esSinkConf.dbLowerBound)
    .setPartitionUpperBound(esSinkConf.dbUpperBound)
    .setNumPartitions(PARTITION_NUM)
    .setFetchSize(BATCH_SIZE)
    .build()
val lookupOptions = JdbcLookupOptions.builder()
    .setCacheMaxSize(-1)
    .setCacheExpireMs(CACHE_SIZE)
    .setMaxRetryTimes(2)
    .build()
val rawSource = JdbcTableSource.builder()
    .setOptions(options)
    .setReadOptions(readOptions)
    .setLookupOptions(lookupOptions)
    .setSchema(schema)
    .build().getDataStream(env)

Re: Flink JDBC connector behavior

Posted by Roman Khachatryan <ro...@apache.org>.
Hi,

scan.partition.num (the number of partitions [1]) translates into
parallel queries to the database (with different to/from). Batch size
is further calculated from lower and upper bounds and the number of
partitions.

scan.fetch-size hints JDBC driver to adjust the fetch size (see [2]).

The first thing I'd check is that you actually have the configured
number of parallel queries and then probably check if there is an
index on the partition column.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/#partitioned-scan
[2]
https://docs.oracle.com/javase/7/docs/api/java/sql/Statement.html#setFetchSize(int)

Regards,
Roman

On Tue, Feb 8, 2022 at 10:09 PM Qihua Yang <ya...@gmail.com> wrote:
>
> Hi,
>
> We are using flink jdbc connector to read whole database table line by line. A few things I don't quite understand.
> We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
> What is the flink internal behavior to read data from table?
> Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM) data each time? Or it read whole table into memory each time?
> database metrics show the sql latency is extremely high, almost 20s.
> is there any way to optimize it?
>
> val query = String.format("SELECT * FROM %s", tableName)
>
> val options = JdbcOptions.builder()
>     .setDBUrl(url)
>     .setTableName(tableName)
>     .setDriverName(DRIVER_NAME)
>     .setUsername(userName)
>     .setPassword(password)
>     .build()
> val readOptions = JdbcReadOptions.builder()
>     .setQuery(query)
>     .setPartitionColumnName(PARTITION_KEY)
>     .setPartitionLowerBound(esSinkConf.dbLowerBound)
>     .setPartitionUpperBound(esSinkConf.dbUpperBound)
>     .setNumPartitions(PARTITION_NUM)
>     .setFetchSize(BATCH_SIZE)
>     .build()
> val lookupOptions = JdbcLookupOptions.builder()
>     .setCacheMaxSize(-1)
>     .setCacheExpireMs(CACHE_SIZE)
>     .setMaxRetryTimes(2)
>     .build()
> val rawSource = JdbcTableSource.builder()
>     .setOptions(options)
>     .setReadOptions(readOptions)
>     .setLookupOptions(lookupOptions)
>     .setSchema(schema)
>     .build().getDataStream(env)
>
>