You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yelun <98...@qq.com> on 2018/11/14 02:34:31 UTC

How flink table api to join with mysql dimtable

hi,

I want to use flink sql to left join static dimension table from mysql currently, so I converted the mysql table into data stream to join with datastream which has converted to flink table. While I found that the real-time stream data is not joined correctly with mysql data  at the beginning, but the latter stream can be joined correctly. So I want to ask that is there any good way to make real-time stream can join with mysql data with table api which has loaded and supporting dynamicly loading mysql data into memory once each hour. Thanks a lot.

The following is the some example code:

public static JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
	.setDrivername(DRIVER_CLASS)
	.setDBUrl(DB_URL)
	.setUsername(USER_NAME)
	.setPassword(USER_PASS)
	.setQuery(SELECT_ALL_PERSONS)
	.setRowTypeInfo(ROW_TYPE_INFO);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Order> orderA = env.addSource(new OrderFunction());
tEnv.registerDataStream("tableA", orderA, "name, product, amount");

DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
String[] dim_table_fileds = {"id","name","age","address"};

tEnv.registerDataStream("tableB",mysql_table);
Table result = tEnv.sqlQuery("SELECT tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join tableA on tableA.name = tableB.name" );
tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
env.execute();

Thanks a lot.

Re: How flink table api to join with mysql dimtable

Posted by Hequn Cheng <ch...@gmail.com>.
Hi yelun,

Currently, there are no direct ways to dynamically load data and do join in
Flink-SQL, as a workaround you can implement your logic with an udtf. In
the udtf, you can load the data into a cache and update it according to
your requirement.

Best, Hequn

On Wed, Nov 14, 2018 at 10:34 AM yelun <98...@qq.com> wrote:

> hi,
>
> I want to use flink sql to left join static dimension table from mysql
> currently, so I converted the mysql table into data stream to join with
> datastream which has converted to flink table. While I found that the
> real-time stream data is not joined correctly with mysql data  at the
> beginning, but the latter stream can be joined correctly. So I want to ask
> that is there any good way to make real-time stream can join with mysql
> data with table api which has loaded and supporting dynamicly loading mysql
> data into memory once each hour. Thanks a lot.
>
> The following is the some example code:
>
> public static JDBCInputFormatBuilder inputBuilder =
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername(DRIVER_CLASS)
> .setDBUrl(DB_URL)
> .setUsername(USER_NAME)
> .setPassword(USER_PASS)
> .setQuery(SELECT_ALL_PERSONS)
> .setRowTypeInfo(ROW_TYPE_INFO);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);
>
> DataStream<Order> orderA = env.addSource(new OrderFunction());
> tEnv.registerDataStream("tableA", orderA, "name, product, amount");
>
> DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
> String[] dim_table_fileds = {"id","name","age","address"};
>
> tEnv.registerDataStream("tableB",mysql_table);
> Table result = tEnv.sqlQuery("SELECT
> tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join
> tableA on tableA.name = tableB.name" );
> tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
> env.execute();
>
> Thanks a lot.
>