You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xiaoyue <xi...@ysstech.com> on 2022/02/25 05:37:13 UTC

flink1.14 注册mysql connector报错

flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!

代码:
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build();
        tEnv = StreamTableEnvironment.create(env, Settings);
        
       String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string, " +
                "  primary key(dmo_index_code) not enforced) " +
                "  with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'yss300377@ZT', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' = 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";       
         tEnv.executeSql(mysqlSink);
         tEnv.execute("mysql_sink_test");

报错:
        org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "not" at line 1, column 126.
Was expecting one of:
    "DISABLE" ...
    "ENABLE" ...
    "NORELY" ...
    "NOVALIDATE" ...
    "RELY" ...
    "VALIDATE" ...
    ")" ...
    "," ...
    

at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "not" at line 1, column 126.



xiaoyue@ysstech.com

Re: Re: flink1.14 注册mysql connector报错

Posted by xiaoyue <xi...@ysstech.com>.
好的,成功入库,非常感谢您!



xiaoyue@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:57
收件人: user-zh
主题: Re: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。
 
best regards,
 
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午2:36寫道:
 
> Hi tony,
>    完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
>         # 执行环境
>         env = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         tEnv = StreamTableEnvironment.create(env, Settings);
>
>         # hive源
>         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>         String confSite = "src\\main\\resources";
>
>         String version = "3.1.2";
>
>         String defaultDatabase = "fund_analysis";
>
>         HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
>         tEnv.registerCatalog("hive", hiveCat);
>
>         tEnv.useCatalog("hive");
>         # hive 取数SQL
>         String biz_date = "20211130";
>         String tblSource = String.format("select " +
>                 "coalesce(a.rate,0) as yldrate, " +
>                 "coalesce(c.rate,0) as riskless_yldrate, " +
>                 "a.ccy_type, " +
>                 "a.biz_date, " +
>                 "b.is_exch_dt, " +
>                 "a.pf_id " +
>                 "from " +
>                 "ts_pf_yldrate a " +
>                 "inner join td_gl_day b on b.dt = a.biz_date " +
>                 "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
>                 "where a.biz_date <= '%s'", biz_date);
>         Table table = tEnv.sqlQuery(tblSource);
>
>         // 注册flatmap函数
>         tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
>         // 注册聚合函数
>         tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
>         // 执行flatmap操作
>         Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
>                 $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
>                 $("is_exch_dt"),$("pf_id"), biz_date));
>
>          // 切换catalog,并注册表
>         tEnv.useCatalog("default_catalog");
>         tEnv.createTemporaryView("tagTable",tagTbl);
>
>         // 调用函数SharpeRatioAgg 计算结果
>          Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
>         // result.execute().print(); (--> 该步 result 可成功打印)
>
>         // 下沉操作
>         String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string" +
>                 ") with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'xxxxxxx', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>         tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
>         tEnv.execute("mysql_sink_test");
>
>
> xiaoyue@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
>         String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string, " +
>                 "  primary key(dmo_index_code) not enforced) " +
>                 "  with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'yss300377@ZT', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>         tEnv.executeSql(mysqlSink).print();
> //        tEnv.execute("mysql_sink_test");
>     }
>
> 輸出的結果為:
> +--------+
> | result |
> +--------+
> |     OK |
> +--------+
> 1 row in set
>
> best regards,
>
> xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
> >
> > 代码:
> >         env = StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings Settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >         tEnv = StreamTableEnvironment.create(env, Settings);
> >
> >        String mysqlSink = "create table bulk_index_sink(" +
> >                 "  biz_date string, " +
> >                 "  dmo_index_code string, " +
> >                 "  index_value string, " +
> >                 "  primary key(dmo_index_code) not enforced) " +
> >                 "  with (" +
> >                 "   'connector' = 'jdbc', " +
> >                 "   'username' = 'root', " +
> >                 "   'password' = 'yss300377@ZT', " +
> >                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> >                 "   'url' = 'jdbc:mysql://
> > 192.168.100.104:3306/test?useSSL=False', " +
> >                 "   'table-name' = 'bulk_index_sink')";
> >          tEnv.executeSql(mysqlSink);
> >          tEnv.execute("mysql_sink_test");
> >
> > 报错:
> >         org.apache.flink.table.api.SqlParserException: SQL parse failed.
> > Encountered "not" at line 1, column 126.
> > Was expecting one of:
> >     "DISABLE" ...
> >     "ENABLE" ...
> >     "NORELY" ...
> >     "NOVALIDATE" ...
> >     "RELY" ...
> >     "VALIDATE" ...
> >     ")" ...
> >     "," ...
> >
> >
> > at
> >
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> > at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> > at
> >
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> > at
> >
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> > at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> > "not" at line 1, column 126.
> >
> >
> >
> > xiaoyue@ysstech.com
> >
>

Re: Re: flink1.14 注册mysql connector报错

Posted by Tony Wei <to...@gmail.com>.
Hi xiaoyue,

看起來是這行造成的 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
你可能需要在執行下沉操作前將 SqlDialect 更換回 SqlDialect.DEFAULT。

best regards,

xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午2:36寫道:

> Hi tony,
>    完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf
> function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。
>
> 代码:
>         # 执行环境
>         env = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         tEnv = StreamTableEnvironment.create(env, Settings);
>
>         # hive源
>         tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>         String confSite = "src\\main\\resources";
>
>         String version = "3.1.2";
>
>         String defaultDatabase = "fund_analysis";
>
>         HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase,
> confSite, confSite, version);
>
>         tEnv.registerCatalog("hive", hiveCat);
>
>         tEnv.useCatalog("hive");
>         # hive 取数SQL
>         String biz_date = "20211130";
>         String tblSource = String.format("select " +
>                 "coalesce(a.rate,0) as yldrate, " +
>                 "coalesce(c.rate,0) as riskless_yldrate, " +
>                 "a.ccy_type, " +
>                 "a.biz_date, " +
>                 "b.is_exch_dt, " +
>                 "a.pf_id " +
>                 "from " +
>                 "ts_pf_yldrate a " +
>                 "inner join td_gl_day b on b.dt = a.biz_date " +
>                 "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date
> and c.pf_id = a.pf_id " +
>                 "where a.biz_date <= '%s'", biz_date);
>         Table table = tEnv.sqlQuery(tblSource);
>
>         // 注册flatmap函数
>         tEnv.createTemporarySystemFunction("RowFlatMap",
> SharpeRatioFlatMap.class);
>         // 注册聚合函数
>         tEnv.createTemporarySystemFunction("SharpeRatioAgg",
> SharpeRatioAggregate.class);
>
>         // 执行flatmap操作
>         Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
>                 $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
>                 $("is_exch_dt"),$("pf_id"), biz_date));
>
>          // 切换catalog,并注册表
>         tEnv.useCatalog("default_catalog");
>         tEnv.createTemporaryView("tagTable",tagTbl);
>
>         // 调用函数SharpeRatioAgg 计算结果
>          Table result = tEnv.sqlQuery(String.format("select '%s' as
> biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless,
> dmo_index_code) as index_value from tagTable group by dmo_index_code",
> biz_date));
>         // result.execute().print(); (--> 该步 result 可成功打印)
>
>         // 下沉操作
>         String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string" +
>                 ") with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'xxxxxxx', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' =
> 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>         tEnv.executeSql(mysqlSink);
>
>
> result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
>         tEnv.execute("mysql_sink_test");
>
>
> xiaoyue@ysstech.com
>
> 发件人: Tony Wei
> 发送时间: 2022-02-25 14:13
> 收件人: user-zh
> 主题: Re: flink1.14 注册mysql connector报错
> Hi xiaoyue,
>
> 請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
> 我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
>
>     public static void main(String[] args) {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, Settings);
>
>         String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string, " +
>                 "  primary key(dmo_index_code) not enforced) " +
>                 "  with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'yss300377@ZT', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' =
> 'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>         tEnv.executeSql(mysqlSink).print();
> //        tEnv.execute("mysql_sink_test");
>     }
>
> 輸出的結果為:
> +--------+
> | result |
> +--------+
> |     OK |
> +--------+
> 1 row in set
>
> best regards,
>
> xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
>
> > flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
> >
> > 代码:
> >         env = StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings Settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >         tEnv = StreamTableEnvironment.create(env, Settings);
> >
> >        String mysqlSink = "create table bulk_index_sink(" +
> >                 "  biz_date string, " +
> >                 "  dmo_index_code string, " +
> >                 "  index_value string, " +
> >                 "  primary key(dmo_index_code) not enforced) " +
> >                 "  with (" +
> >                 "   'connector' = 'jdbc', " +
> >                 "   'username' = 'root', " +
> >                 "   'password' = 'yss300377@ZT', " +
> >                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
> >                 "   'url' = 'jdbc:mysql://
> > 192.168.100.104:3306/test?useSSL=False', " +
> >                 "   'table-name' = 'bulk_index_sink')";
> >          tEnv.executeSql(mysqlSink);
> >          tEnv.execute("mysql_sink_test");
> >
> > 报错:
> >         org.apache.flink.table.api.SqlParserException: SQL parse failed.
> > Encountered "not" at line 1, column 126.
> > Was expecting one of:
> >     "DISABLE" ...
> >     "ENABLE" ...
> >     "NORELY" ...
> >     "NOVALIDATE" ...
> >     "RELY" ...
> >     "VALIDATE" ...
> >     ")" ...
> >     "," ...
> >
> >
> > at
> >
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> > at
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> > at
> >
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> > at
> >
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> > at
> >
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> > at
> >
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> > at
> >
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> > at
> >
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> > at
> >
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> > at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> > at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> > at
> >
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> > at
> >
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> > at
> >
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> > "not" at line 1, column 126.
> >
> >
> >
> > xiaoyue@ysstech.com
> >
>

Re: Re: flink1.14 注册mysql connector报错

Posted by xiaoyue <xi...@ysstech.com>.
Hi tony,
   完整代码,是从hive取数据,执行flatmap, aggregate操作后再下沉到mysql。 由于篇幅, 中间的udf function定义过程不完整贴出了,您可以参考下,非常感谢您的帮助,麻烦啦。

代码:
        # 执行环境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings = EnvironmentSettings.newInstance().inBatchMode().build();
        tEnv = StreamTableEnvironment.create(env, Settings);
        
        # hive源
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

        String confSite = "src\\main\\resources";

        String version = "3.1.2";

        String defaultDatabase = "fund_analysis";

        HiveCatalog hiveCat = new HiveCatalog("hive", defaultDatabase, confSite, confSite, version);

        tEnv.registerCatalog("hive", hiveCat);

        tEnv.useCatalog("hive");
        # hive 取数SQL
        String biz_date = "20211130";
        String tblSource = String.format("select " +
                "coalesce(a.rate,0) as yldrate, " +
                "coalesce(c.rate,0) as riskless_yldrate, " +
                "a.ccy_type, " +
                "a.biz_date, " +
                "b.is_exch_dt, " +
                "a.pf_id " +
                "from " +
                "ts_pf_yldrate a " +
                "inner join td_gl_day b on b.dt = a.biz_date " +
                "inner join ts_pf_bm_yldrate c on c.biz_date = a.biz_date and c.pf_id = a.pf_id " +
                "where a.biz_date <= '%s'", biz_date);
        Table table = tEnv.sqlQuery(tblSource);
        
        // 注册flatmap函数
        tEnv.createTemporarySystemFunction("RowFlatMap", SharpeRatioFlatMap.class);
        // 注册聚合函数
        tEnv.createTemporarySystemFunction("SharpeRatioAgg", SharpeRatioAggregate.class);
        
        // 执行flatmap操作
        Table tagTbl = table.flatMap(call("RowFlatMap",$("yldrate"),
                $("riskless_yldrate"),$("ccy_type"),$("biz_date"),
                $("is_exch_dt"),$("pf_id"), biz_date));
       
         // 切换catalog,并注册表
        tEnv.useCatalog("default_catalog");
        tEnv.createTemporaryView("tagTable",tagTbl);
        
        // 调用函数SharpeRatioAgg 计算结果
         Table result = tEnv.sqlQuery(String.format("select '%s' as biz_date, dmo_index_code, SharpeRatioAgg(yldrate, yldrate_riskless, dmo_index_code) as index_value from tagTable group by dmo_index_code", biz_date));
        // result.execute().print(); (--> 该步 result 可成功打印)
        
        // 下沉操作
        String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string" +
                ") with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'xxxxxxx', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' = 'jdbc:mysql://hadoop104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";
        tEnv.executeSql(mysqlSink);

        result.select("biz_date,dmo_index_code,index_value").insertInto("bulk_index_sink");
        tEnv.execute("mysql_sink_test");


xiaoyue@ysstech.com
 
发件人: Tony Wei
发送时间: 2022-02-25 14:13
收件人: user-zh
主题: Re: flink1.14 注册mysql connector报错
Hi xiaoyue,
 
請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。
 
    public static void main(String[] args) {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);
 
        String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string, " +
                "  primary key(dmo_index_code) not enforced) " +
                "  with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'yss300377@ZT', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";
        tEnv.executeSql(mysqlSink).print();
//        tEnv.execute("mysql_sink_test");
    }
 
輸出的結果為:
+--------+
| result |
+--------+
|     OK |
+--------+
1 row in set
 
best regards,
 
xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:
 
> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
>         env = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         tEnv = StreamTableEnvironment.create(env, Settings);
>
>        String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string, " +
>                 "  primary key(dmo_index_code) not enforced) " +
>                 "  with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'yss300377@ZT', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>          tEnv.executeSql(mysqlSink);
>          tEnv.execute("mysql_sink_test");
>
> 报错:
>         org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
>     "DISABLE" ...
>     "ENABLE" ...
>     "NORELY" ...
>     "NOVALIDATE" ...
>     "RELY" ...
>     "VALIDATE" ...
>     ")" ...
>     "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiaoyue@ysstech.com
>

Re: flink1.14 注册mysql connector报错

Posted by Tony Wei <to...@gmail.com>.
Hi xiaoyue,

請問你的測試代碼有其他額外的代碼或配置嗎?能否分享下完整的代碼和配置文件?
我嘗試在我的 IDE 上運行了下列代碼是能成功運行的。

    public static void main(String[] args) {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings Settings =
EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, Settings);

        String mysqlSink = "create table bulk_index_sink(" +
                "  biz_date string, " +
                "  dmo_index_code string, " +
                "  index_value string, " +
                "  primary key(dmo_index_code) not enforced) " +
                "  with (" +
                "   'connector' = 'jdbc', " +
                "   'username' = 'root', " +
                "   'password' = 'yss300377@ZT', " +
                "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
                "   'url' =
'jdbc:mysql://192.168.100.104:3306/test?useSSL=False', " +
                "   'table-name' = 'bulk_index_sink')";
        tEnv.executeSql(mysqlSink).print();
//        tEnv.execute("mysql_sink_test");
    }

輸出的結果為:
+--------+
| result |
+--------+
|     OK |
+--------+
1 row in set

best regards,

xiaoyue <xi...@ysstech.com> 於 2022年2月25日 週五 下午1:37寫道:

> flink1.14  注册mysql下车Connector报错,检查多次未发现语法错误,求助!
>
> 代码:
>         env = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings Settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>         tEnv = StreamTableEnvironment.create(env, Settings);
>
>        String mysqlSink = "create table bulk_index_sink(" +
>                 "  biz_date string, " +
>                 "  dmo_index_code string, " +
>                 "  index_value string, " +
>                 "  primary key(dmo_index_code) not enforced) " +
>                 "  with (" +
>                 "   'connector' = 'jdbc', " +
>                 "   'username' = 'root', " +
>                 "   'password' = 'yss300377@ZT', " +
>                 "   'driver' = 'com.mysql.cj.jdbc.Driver', " +
>                 "   'url' = 'jdbc:mysql://
> 192.168.100.104:3306/test?useSSL=False', " +
>                 "   'table-name' = 'bulk_index_sink')";
>          tEnv.executeSql(mysqlSink);
>          tEnv.execute("mysql_sink_test");
>
> 报错:
>         org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "not" at line 1, column 126.
> Was expecting one of:
>     "DISABLE" ...
>     "ENABLE" ...
>     "NORELY" ...
>     "NOVALIDATE" ...
>     "RELY" ...
>     "VALIDATE" ...
>     ")" ...
>     "," ...
>
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at
> com.yss.datamiddle.index.c001.SharpeRatioTest.udfFlatMapTest(SharpeRatioTest.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "not" at line 1, column 126.
>
>
>
> xiaoyue@ysstech.com
>