You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xiaoyue <xi...@ysstech.com> on 2022/02/25 05:37:13 UTC
flink1.14 注册mysql connector报错
flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助!
代码:
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);
String mysqlSink = "create table bulk_index_sink(" +
" biz_date string, " +
" dmo_index_code string, " +
" index_value string, " +
" primary key(dmo_index_code) not enforced) " +
" with (" +
" 'connector' = 'jdbc', " +
" 'username' = 'root', " +
" 'password' = 'yss300377@ZT', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
" 'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink);
tEnv.execute("mysql_sink_test");
报错:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "not" at line 1, column 126.
Was expecting one of:
"DISABLE" ...
"ENABLE" ...
"NORELY" ...
"NOVALIDATE" ...
"RELY" ...
"VALIDATE" ...
")" ...
"," ...
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "not" at line 1, column 126.
xiaoyue@ysstech.com
Re: Re: flink1.14 注册mysql connector报错
Posted by xiaoyue <xi...@ysstech.com>.
好的,成功入库,非常感谢您!
xiaoyue@ysstech.com
发件人: Tony Wei
发送时间: 2022-02-25 14:57
收件人: user-zh
主题: Re: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
best regards,
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午2:36寫道:
> Hi tony,
> 完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
> // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
> Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string" +
> ") with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'xxxxxxx', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiaoyue@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string, " +
> " primary key(dmo_index_code) not enforced) " +
> " with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'yss300377@ZT', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink).print();
> // tEnv.execute("mysql_sink_test");
> }
>
> 輸出的結果為:
> +--------+
> | result |
> +--------+
> | OK |
> +--------+
> 1 row in set
>
> best regards,
>
> xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助!
> >
> > 代码:
> > env = StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings Settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> > tEnv = StreamTableEnvironment.create(env, Settings);
> >
> > String mysqlSink = "create table bulk_index_sink(" +
> > " biz_date string, " +
> > " dmo_index_code string, " +
> > " index_value string, " +
> > " primary key(dmo_index_code) not enforced) " +
> > " with (" +
> > " 'connector' = 'jdbc', " +
> > " 'username' = 'root', " +
> > " 'password' = 'yss300377@ZT', " +
> > " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> > " 'url' = 'jdbc:mysql://
> > 192.168.100.104:3306/test?useSSL=False', " +
> > " 'table-name' = 'bulk_index_sink')";
> > tEnv.executeSql(mysqlSink);
> > tEnv.execute("mysql_sink_test");
> >
> > 报错:
> > org.apache.flink.table.api.SqlParserException: SQL parse failed.
> > Encountered "not" at line 1, column 126.
> > Was expecting one of:
> > "DISABLE" ...
> > "ENABLE" ...
> > "NORELY" ...
> > "NOVALIDATE" ...
> > "RELY" ...
> > "VALIDATE" ...
> > ")" ...
> > "," ...
> >
> >
> > at
> >
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> > at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> > at
> >
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> > at
> >
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> > at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> > "not" at line 1, column 126.
> >
> >
> >
> > xiaoyue@ysstech.com
> >
>
Re: Re: flink1.14 注册mysql connector报错
Posted by Tony Wei <to...@gmail.com>.
Hi xiaoyue,
看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
best regards,
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午2:36寫道:
> Hi tony,
> 完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
> # 执行环境
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> # hive源
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> String confSite = "src\\main\\resources";
>
> String version = "3.1.2";
>
> String defaultDatabase = "fund_analysis";
>
> HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
> tEnv.registerCatalog("hive", hiveCat);
>
> tEnv.useCatalog("hive");
> # hive 取数SQL
> String biz_date = "20211130";
> String tblSource = String.format("select " +
> "coalesce(a.rate,0) as yldrate, " +
> "coalesce(c.rate,0) as riskless_yldrate, " +
> "a.ccy_type, " +
> "a.biz_date, " +
> "b.is_exch_dt, " +
> "a.pf_id " +
> "from " +
> "ts_pf_yldrate a " +
> "inner join td_gl_day b on b.dt = a.biz_date " +
> "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
> "where a.biz_date <= '%s'", biz_date);
> Table table = tEnv.sqlQuery(tblSource);
>
> // 注册flatmap函数
> tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
> // 注册聚合函数
> tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
> // 执行flatmap操作
> Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
> $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
> $("is_exch_dt"),$("pf_id"), biz_date));
>
> // 切换catalog,并注册表
> tEnv.useCatalog("default_catalog");
> tEnv.createTemporaryView("tagTable",tagTbl);
>
> // 调用函数SharpeRatioAgg 计算结果
> Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
> // result.execute().print(); (--> 该步 result 可成功打印)
>
> // 下沉操作
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string" +
> ") with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'xxxxxxx', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
> tEnv.execute("mysql_sink_test");
>
>
> xiaoyue@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
> public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string, " +
> " primary key(dmo_index_code) not enforced) " +
> " with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'yss300377@ZT', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink).print();
> // tEnv.execute("mysql_sink_test");
> }
>
> 輸出的結果為:
> +--------+
> | result |
> +--------+
> | OK |
> +--------+
> 1 row in set
>
> best regards,
>
> xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助!
> >
> > 代码:
> > env = StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings Settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> > tEnv = StreamTableEnvironment.create(env, Settings);
> >
> > String mysqlSink = "create table bulk_index_sink(" +
> > " biz_date string, " +
> > " dmo_index_code string, " +
> > " index_value string, " +
> > " primary key(dmo_index_code) not enforced) " +
> > " with (" +
> > " 'connector' = 'jdbc', " +
> > " 'username' = 'root', " +
> > " 'password' = 'yss300377@ZT', " +
> > " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> > " 'url' = 'jdbc:mysql://
> > 192.168.100.104:3306/test?useSSL=False', " +
> > " 'table-name' = 'bulk_index_sink')";
> > tEnv.executeSql(mysqlSink);
> > tEnv.execute("mysql_sink_test");
> >
> > 报错:
> > org.apache.flink.table.api.SqlParserException: SQL parse failed.
> > Encountered "not" at line 1, column 126.
> > Was expecting one of:
> > "DISABLE" ...
> > "ENABLE" ...
> > "NORELY" ...
> > "NOVALIDATE" ...
> > "RELY" ...
> > "VALIDATE" ...
> > ")" ...
> > "," ...
> >
> >
> > at
> >
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> > at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> > at
> >
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> > at
> >
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> > at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> > "not" at line 1, column 126.
> >
> >
> >
> > xiaoyue@ysstech.com
> >
>
Re: Re: flink1.14 注册mysql connector报错
Posted by xiaoyue <xi...@ysstech.com>.
Hi tony,
完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
代码:
# 执行环境
env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = StreamTableEnvironment.create(env, Settings);
# hive源
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
String confSite = "src\\main\\resources";
String version = "3.1.2";
String defaultDatabase = "fund_analysis";
HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, confSite, confSite, version);
tEnv.registerCatalog("hive", hiveCat);
tEnv.useCatalog("hive");
# hive 取数SQL
String biz_date = "20211130";
String tblSource = String.format("select " +
"coalesce(a.rate,0) as yldrate, " +
"coalesce(c.rate,0) as riskless_yldrate, " +
"a.ccy_type, " +
"a.biz_date, " +
"b.is_exch_dt, " +
"a.pf_id " +
"from " +
"ts_pf_yldrate a " +
"inner join td_gl_day b on b.dt = a.biz_date " +
"inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and c.pf_id = a.pf_id " +
"where a.biz_date <= '%s'", biz_date);
Table table = tEnv.sqlQuery(tblSource);
// 注册flatmap函数
tEnv.createTemporarySystemFunction("RowFlatMap", SharpeRatioFlatMap.class);
// 注册聚合函数
tEnv.createTemporarySystemFunction("SharpeRatioAgg", SharpeRatioAggregate.class);
// 执行flatmap操作
Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
$("riskless_yldrate"),$("ccy_type"),$("biz_date"),
$("is_exch_dt"),$("pf_id"), biz_date));
// 切换catalog,并注册表
tEnv.useCatalog("default_catalog");
tEnv.createTemporaryView("tagTable",tagTbl);
// 调用函数SharpeRatioAgg 计算结果
Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as index_value from tagTable group by dmo_index_code", biz_date));
// result.execute().print(); (--> 该步 result 可成功打印)
// 下沉操作
String mysqlSink = "create table bulk_index_sink(" +
" biz_date string, " +
" dmo_index_code string, " +
" index_value string" +
") with (" +
" 'connector' = 'jdbc', " +
" 'username' = 'root', " +
" 'password' = 'xxxxxxx', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
" 'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink);
result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
tEnv.execute("mysql_sink_test");
xiaoyue@ysstech.com
发件人: Tony Wei
发送时间: 2022-02-25 14:13
收件人: user-zh
主题: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
String mysqlSink = "create table bulk_index_sink(" +
" biz_date string, " +
" dmo_index_code string, " +
" index_value string, " +
" primary key(dmo_index_code) not enforced) " +
" with (" +
" 'connector' = 'jdbc', " +
" 'username' = 'root', " +
" 'password' = 'yss300377@ZT', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
" 'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
// tEnv.execute("mysql_sink_test");
}
輸出的結果為:
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
best regards,
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
> flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string, " +
> " primary key(dmo_index_code) not enforced) " +
> " with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'yss300377@ZT', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
> tEnv.execute("mysql_sink_test");
>
> 报错:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
> "DISABLE" ...
> "ENABLE" ...
> "NORELY" ...
> "NOVALIDATE" ...
> "RELY" ...
> "VALIDATE" ...
> ")" ...
> "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiaoyue@ysstech.com
>
Re: flink1.14 注册mysql connector报错
Posted by Tony Wei <to...@gmail.com>.
Hi xiaoyue,
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
String mysqlSink = "create table bulk_index_sink(" +
" biz_date string, " +
" dmo_index_code string, " +
" index_value string, " +
" primary key(dmo_index_code) not enforced) " +
" with (" +
" 'connector' = 'jdbc', " +
" 'username' = 'root', " +
" 'password' = 'yss300377@ZT', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
" 'table-name' = 'bulk_index_sink')";
tEnv.executeSql(mysqlSink).print();
// tEnv.execute("mysql_sink_test");
}
輸出的結果為:
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
best regards,
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
> flink1.14 注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
> tEnv = StreamTableEnvironment.create(env, Settings);
>
> String mysqlSink = "create table bulk_index_sink(" +
> " biz_date string, " +
> " dmo_index_code string, " +
> " index_value string, " +
> " primary key(dmo_index_code) not enforced) " +
> " with (" +
> " 'connector' = 'jdbc', " +
> " 'username' = 'root', " +
> " 'password' = 'yss300377@ZT', " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
> " 'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
> " 'table-name' = 'bulk_index_sink')";
> tEnv.executeSql(mysqlSink);
> tEnv.execute("mysql_sink_test");
>
> 报错:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
> "DISABLE" ...
> "ENABLE" ...
> "NORELY" ...
> "NOVALIDATE" ...
> "RELY" ...
> "VALIDATE" ...
> ")" ...
> "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiaoyue@ysstech.com
>