You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 大罗 <ti...@163.com> on 2020/09/09 07:13:24 UTC

flink sql 1.11.1 insert data to hive from kafka split into two jobs

Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:

首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
:2},
{"pid":"a", "val":1, "data_type": 1, "app_type" :2}]

然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
"app_type" :2}

把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())

然后,再创建临时表,比如:
tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
                $("pid"),  $("val"), $("app_type"), $("data_type"));

接着定义不同的sql,比如:
String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
and app_type = 0"
String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
and app_type = 1"
String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
and app_type = 1"
String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
and app_type = 0"

使用StatementSet运行它们:
StatementSet ss = tableEnv.createStatementSet();
ss.addInsertSql(sql1);
ss.addInsertSql(sql2);
ss.addInsertSql(sql3);
ss.addInsertSql(sql4);

最后执行作业:
env.execute(jobName);

一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:

<http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> 

作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),

作业"insert-into_myhive.dw.ods_analog_sems
*******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:

<http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> 

其中,顶端的operator的定义如下:
Source: Custom Source -> Map -> Flat Map -> Filter ->
SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
run_data_type]) -> 
(Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter)

我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA"
会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
savepoint."
相应的停止作业jobB的时候也会生成这个savepoint。

我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

Posted by Jark Wu <im...@gmail.com>.
Hi,

目前 DataStream 和 StatementSet 没法在一个 job 中提交。 社区已经注意到这个问题,见FLINK-18840 [1],
且会在 FLIP-136 [2] 中支持。

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18840
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API?src=contextnavpagetreemode

On Thu, 10 Sep 2020 at 11:03, Qishang <zh...@gmail.com> wrote:

> Hi. 大罗
> 试一下这个方法 org.apache.flink.table.api.StatementSet#execute
> ss.execute();
>
> 大罗 <ti...@163.com> 于2020年9月9日周三 下午3:13写道:
>
> > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
> >
> > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> > :2},
> > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
> >
> > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type":
> 1,
> > "app_type" :2}
> >
> > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
> >
> > 然后,再创建临时表,比如:
> > tableEnv.createTemporaryView("kafkaT1",
> runDataSingleOutputStreamOperator,
> >                 $("pid"),  $("val"), $("app_type"), $("data_type"));
> >
> > 接着定义不同的sql,比如:
> > String sql1 = "insert into ods_data_10 select pid, val where data_type =
> 1
> > and app_type = 0"
> > String sql2 = "insert into ods_data_11 select pid, val where data_type =
> 1
> > and app_type = 1"
> > String sql3 = "insert into ods_data_01 select pid, val where data_type =
> 0
> > and app_type = 1"
> > String sql4 = "insert into ods_data_00 select pid, val where data_type =
> 0
> > and app_type = 0"
> >
> > 使用StatementSet运行它们:
> > StatementSet ss = tableEnv.createStatementSet();
> > ss.addInsertSql(sql1);
> > ss.addInsertSql(sql2);
> > ss.addInsertSql(sql3);
> > ss.addInsertSql(sql4);
> >
> > 最后执行作业:
> > env.execute(jobName);
> >
> > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
> >
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png
> >
> >
> >
> > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
> >
> > 作业"insert-into_myhive.dw.ods_analog_sems
> > *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
> >
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png
> >
> >
> >
> > 其中,顶端的operator的定义如下:
> > Source: Custom Source -> Map -> Flat Map -> Filter ->
> > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> > run_data_type]) ->
> > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> > _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> > _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> > ->
> > StreamingFileWriter,
> > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'HH')
> > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> > ->
> > StreamingFileWriter)
> >
> > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA"
> > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> > savepoint."
> > 相应的停止作业jobB的时候也会生成这个savepoint。
> >
> > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

Posted by Qishang <zh...@gmail.com>.
Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute
ss.execute();

大罗 <ti...@163.com> 于2020年9月9日周三 下午3:13写道:

> Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
>
> 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> :2},
> {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
>
> 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
> "app_type" :2}
>
> 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
>
> 然后,再创建临时表,比如:
> tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
>                 $("pid"),  $("val"), $("app_type"), $("data_type"));
>
> 接着定义不同的sql,比如:
> String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
> and app_type = 0"
> String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
> and app_type = 1"
> String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
> and app_type = 1"
> String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
> and app_type = 0"
>
> 使用StatementSet运行它们:
> StatementSet ss = tableEnv.createStatementSet();
> ss.addInsertSql(sql1);
> ss.addInsertSql(sql2);
> ss.addInsertSql(sql3);
> ss.addInsertSql(sql4);
>
> 最后执行作业:
> env.execute(jobName);
>
> 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png>
>
>
> 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
>
> 作业"insert-into_myhive.dw.ods_analog_sems
> *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png>
>
>
> 其中,顶端的operator的定义如下:
> Source: Custom Source -> Map -> Flat Map -> Filter ->
> SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> run_data_type]) ->
> (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter)
>
> 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA"
> 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> savepoint."
> 相应的停止作业jobB的时候也会生成这个savepoint。
>
> 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>