You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 琴师 <11...@qq.com> on 2021/06/01 08:33:50 UTC

Pyflink jdbc相关

Hi,
&nbsp; &nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
我的原代码如下:


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")


# 2. create source Table
table_env.execute_sql("""


CREATE TABLE table_source (
&nbsp; e string
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp; 'url' = 'jdbc:mysql://********:3306/test',
&nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&nbsp; 'table-name' = 'enum_test',
&nbsp; 'username' = 'pms_etl',
&nbsp; 'password' = 'pms_etl_q'
)


""")


# 3. create sink Table
table_env.execute_sql("""
&nbsp; &nbsp; CREATE TABLE print (
&nbsp; &nbsp; &nbsp; &nbsp; e string
&nbsp; &nbsp; ) WITH (
&nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
&nbsp; &nbsp; )
""")
&nbsp; &nbsp;


table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()



我直接用python执行时候错误返回如下


Traceback (most recent call last):
&nbsp; File "demo.py", line 41, in <module&gt;
&nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
&nbsp; &nbsp; return TableResult(self._j_tenv.executeSql(stmt))
&nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
&nbsp; &nbsp; answer, self.gateway_client, self.target_id, self.name)
&nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
&nbsp; &nbsp; return f(*a, **kw)
&nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
&nbsp; &nbsp; format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.


Table options are:


'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='pms_etl_q'
'table-name'='enum_test'
'url'='jdbc:mysql://*******:3306/test'
'username'='pms_etl'
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
&nbsp; &nbsp; &nbsp; &nbsp; ... 31 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.


Available factory identifiers are:


blackhole
datagen
filesystem
print
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
&nbsp; &nbsp; &nbsp; &nbsp; ... 33 more



我用flink run -py demo.py 返回错误如下:


&nbsp; File "./demo.py", line 41, in <module&gt;
&nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
&nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
&nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
&nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)


org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
&nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
&nbsp; &nbsp; &nbsp; &nbsp; ... 13 more



请问我该如何解决?

回复: Pyflink jdbc相关

Posted by 琴师 <11...@qq.com>.
已解决,非常感谢!




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
发送时间:&nbsp;2021年6月1日(星期二) 晚上7:21
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"琴师"<1129656513@qq.com&gt;;

主题:&nbsp;Re: Pyflink jdbc相关



这样试试,把”\”改成”/“:

file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar



&gt; 2021年6月1日 下午5:40,琴师 <1129656513@qq.com&gt; 写道:
&gt; 
&gt; 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;&nbsp; 这样不能引入,大佬有用用过吗?
&gt; 
&gt; 
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "琴师"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <1129656513@qq.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2021年6月1日(星期二) 下午5:30
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 
&gt; 主题:&amp;nbsp;回复: Pyflink jdbc相关
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 感谢,我换成2.11确实可以了!!!!
&gt; 
&gt; 
&gt; ------------------ 原始邮件 ------------------
&gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <dian0511.fu@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2021年6月1日(星期二) 下午5:04
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;"琴师"<1129656513@qq.com&amp;gt;;
&gt; 
&gt; 主题:&amp;nbsp;Re: Pyflink jdbc相关
&gt; 
&gt; 
&gt; 
&gt; Hi,
&gt; 
&gt; 本地执行:
&gt; 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的
&gt; 
&gt; 
&gt; flink run:
&gt; 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。
&gt; 
&gt; 
&gt; &amp;gt; 2021年6月1日 下午4:33,琴师 <1129656513@qq.com&amp;gt; 写道:
&gt; &amp;gt; 
&gt; &amp;gt; Hi,
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
&gt; &amp;gt; 我的原代码如下:
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; from pyflink.datastream import StreamExecutionEnvironment
&gt; &amp;gt; from pyflink.table import StreamTableEnvironment, EnvironmentSettings
&gt; &amp;gt; env = StreamExecutionEnvironment.get_execution_environment()
&gt; &amp;gt; table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
&gt; &amp;gt; table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; # 2. create source Table
&gt; &amp;gt; table_env.execute_sql("""
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; CREATE TABLE table_source (
&gt; &amp;gt; &amp;amp;nbsp; e string
&gt; &amp;gt; ) WITH (
&gt; &amp;gt; &amp;amp;nbsp;'connector' = 'jdbc',
&gt; &amp;gt; &amp;amp;nbsp; 'url' = 'jdbc:mysql://********:3306/test',
&gt; &amp;gt; &amp;amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt; &amp;gt; &amp;amp;nbsp; 'table-name' = 'enum_test',
&gt; &amp;gt; &amp;amp;nbsp; 'username' = 'pms_etl',
&gt; &amp;gt; &amp;amp;nbsp; 'password' = 'pms_etl_q'
&gt; &amp;gt; )
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; """)
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; # 3. create sink Table
&gt; &amp;gt; table_env.execute_sql("""
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; CREATE TABLE print (
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; e string
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; ) WITH (
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 'connector' = 'print'
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; )
&gt; &amp;gt; """)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 我直接用python执行时候错误返回如下
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; Traceback (most recent call last):
&gt; &amp;gt; &amp;amp;nbsp; File "demo.py", line 41, in <module&amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;gt; &amp;amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; return TableResult(self._j_tenv.executeSql(stmt))
&gt; &amp;gt; &amp;amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; answer, self.gateway_client, self.target_id, self.name)
&gt; &amp;gt; &amp;amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; return f(*a, **kw)
&gt; &amp;gt; &amp;amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; format(target_id, ".", name), value)
&gt; &amp;gt; py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
&gt; &amp;gt; : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; Table options are:
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 'connector'='jdbc'
&gt; &amp;gt; 'driver'='com.mysql.cj.jdbc.Driver'
&gt; &amp;gt; 'password'='pms_etl_q'
&gt; &amp;gt; 'table-name'='enum_test'
&gt; &amp;gt; 'url'='jdbc:mysql://*******:3306/test'
&gt; &amp;gt; 'username'='pms_etl'
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; ... 31 more
&gt; &amp;gt; Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; Available factory identifiers are:
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; blackhole
&gt; &amp;gt; datagen
&gt; &amp;gt; filesystem
&gt; &amp;gt; print
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; ... 33 more
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 我用flink run -py demo.py 返回错误如下:
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; &amp;amp;nbsp; File "./demo.py", line 41, in <module&amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;gt; &amp;amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;gt; &amp;amp;nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;gt; &amp;amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
&gt; &amp;gt; pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
&gt; &amp;gt; Caused by: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; ... 13 more
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 请问我该如何解决?

Re: Pyflink jdbc相关

Posted by Dian Fu <di...@gmail.com>.
这样试试,把”\”改成”/“:

file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar



> 2021年6月1日 下午5:40,琴师 <11...@qq.com> 写道:
> 
> 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;  这样不能引入,大佬有用用过吗?
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "琴师"                                                                                    <1129656513@qq.com&gt;;
> 发送时间:&nbsp;2021年6月1日(星期二) 下午5:30
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> 
> 主题:&nbsp;回复: Pyflink jdbc相关
> 
> 
> 
> 
> 
> 感谢,我换成2.11确实可以了!!!!
> 
> 
> ------------------ 原始邮件 ------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
> 发送时间:&nbsp;2021年6月1日(星期二) 下午5:04
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"琴师"<1129656513@qq.com&gt;;
> 
> 主题:&nbsp;Re: Pyflink jdbc相关
> 
> 
> 
> Hi,
> 
> 本地执行:
> 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的
> 
> 
> flink run:
> 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。
> 
> 
> &gt; 2021年6月1日 下午4:33,琴师 <1129656513@qq.com&gt; 写道:
> &gt; 
> &gt; Hi,
> &gt; &amp;nbsp; &amp;nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
> &gt; 我的原代码如下:
> &gt; 
> &gt; 
> &gt; from pyflink.datastream import StreamExecutionEnvironment
> &gt; from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> &gt; env = StreamExecutionEnvironment.get_execution_environment()
> &gt; table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
> &gt; table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
> &gt; 
> &gt; 
> &gt; # 2. create source Table
> &gt; table_env.execute_sql("""
> &gt; 
> &gt; 
> &gt; CREATE TABLE table_source (
> &gt; &amp;nbsp; e string
> &gt; ) WITH (
> &gt; &amp;nbsp;'connector' = 'jdbc',
> &gt; &amp;nbsp; 'url' = 'jdbc:mysql://********:3306/test',
> &gt; &amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
> &gt; &amp;nbsp; 'table-name' = 'enum_test',
> &gt; &amp;nbsp; 'username' = 'pms_etl',
> &gt; &amp;nbsp; 'password' = 'pms_etl_q'
> &gt; )
> &gt; 
> &gt; 
> &gt; """)
> &gt; 
> &gt; 
> &gt; # 3. create sink Table
> &gt; table_env.execute_sql("""
> &gt; &amp;nbsp; &amp;nbsp; CREATE TABLE print (
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e string
> &gt; &amp;nbsp; &amp;nbsp; ) WITH (
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 'connector' = 'print'
> &gt; &amp;nbsp; &amp;nbsp; )
> &gt; """)
> &gt; &amp;nbsp; &amp;nbsp;
> &gt; 
> &gt; 
> &gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> &gt; 
> &gt; 
> &gt; 
> &gt; 我直接用python执行时候错误返回如下
> &gt; 
> &gt; 
> &gt; Traceback (most recent call last):
> &gt; &amp;nbsp; File "demo.py", line 41, in <module&amp;gt;
> &gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
> &gt; &amp;nbsp; &amp;nbsp; return TableResult(self._j_tenv.executeSql(stmt))
> &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
> &gt; &amp;nbsp; &amp;nbsp; answer, self.gateway_client, self.target_id, self.name)
> &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
> &gt; &amp;nbsp; &amp;nbsp; return f(*a, **kw)
> &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
> &gt; &amp;nbsp; &amp;nbsp; format(target_id, ".", name), value)
> &gt; py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> &gt; : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.
> &gt; 
> &gt; 
> &gt; Table options are:
> &gt; 
> &gt; 
> &gt; 'connector'='jdbc'
> &gt; 'driver'='com.mysql.cj.jdbc.Driver'
> &gt; 'password'='pms_etl_q'
> &gt; 'table-name'='enum_test'
> &gt; 'url'='jdbc:mysql://*******:3306/test'
> &gt; 'username'='pms_etl'
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
> &gt; Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 31 more
> &gt; Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
> &gt; 
> &gt; 
> &gt; Available factory identifiers are:
> &gt; 
> &gt; 
> &gt; blackhole
> &gt; datagen
> &gt; filesystem
> &gt; print
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 33 more
> &gt; 
> &gt; 
> &gt; 
> &gt; 我用flink run -py demo.py 返回错误如下:
> &gt; 
> &gt; 
> &gt; &amp;nbsp; File "./demo.py", line 41, in <module&amp;gt;
> &gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> &gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
> &gt; &amp;nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
> &gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
> &gt; pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
> &gt; 
> &gt; 
> &gt; org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> &gt; Caused by: java.lang.RuntimeException: Python process exits with code: 1
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 13 more
> &gt; 
> &gt; 
> &gt; 
> &gt; 请问我该如何解决?


回复: Pyflink jdbc相关

Posted by 琴师 <11...@qq.com>.
再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar;  这样不能引入,大佬有用用过吗?


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "琴师"                                                                                    <1129656513@qq.com&gt;;
发送时间:&nbsp;2021年6月1日(星期二) 下午5:30
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: Pyflink jdbc相关





感谢,我换成2.11确实可以了!!!!


------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
发送时间:&nbsp;2021年6月1日(星期二) 下午5:04
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"琴师"<1129656513@qq.com&gt;;

主题:&nbsp;Re: Pyflink jdbc相关



Hi,

本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的


flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。


&gt; 2021年6月1日 下午4:33,琴师 <1129656513@qq.com&gt; 写道:
&gt; 
&gt; Hi,
&gt; &amp;nbsp; &amp;nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
&gt; 我的原代码如下:
&gt; 
&gt; 
&gt; from pyflink.datastream import StreamExecutionEnvironment
&gt; from pyflink.table import StreamTableEnvironment, EnvironmentSettings
&gt; env = StreamExecutionEnvironment.get_execution_environment()
&gt; table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
&gt; table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
&gt; 
&gt; 
&gt; # 2. create source Table
&gt; table_env.execute_sql("""
&gt; 
&gt; 
&gt; CREATE TABLE table_source (
&gt; &amp;nbsp; e string
&gt; ) WITH (
&gt; &amp;nbsp;'connector' = 'jdbc',
&gt; &amp;nbsp; 'url' = 'jdbc:mysql://********:3306/test',
&gt; &amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt; &amp;nbsp; 'table-name' = 'enum_test',
&gt; &amp;nbsp; 'username' = 'pms_etl',
&gt; &amp;nbsp; 'password' = 'pms_etl_q'
&gt; )
&gt; 
&gt; 
&gt; """)
&gt; 
&gt; 
&gt; # 3. create sink Table
&gt; table_env.execute_sql("""
&gt; &amp;nbsp; &amp;nbsp; CREATE TABLE print (
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e string
&gt; &amp;nbsp; &amp;nbsp; ) WITH (
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 'connector' = 'print'
&gt; &amp;nbsp; &amp;nbsp; )
&gt; """)
&gt; &amp;nbsp; &amp;nbsp;
&gt; 
&gt; 
&gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; 
&gt; 
&gt; 
&gt; 我直接用python执行时候错误返回如下
&gt; 
&gt; 
&gt; Traceback (most recent call last):
&gt; &amp;nbsp; File "demo.py", line 41, in <module&amp;gt;
&gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;nbsp; &amp;nbsp; return TableResult(self._j_tenv.executeSql(stmt))
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;nbsp; &amp;nbsp; answer, self.gateway_client, self.target_id, self.name)
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
&gt; &amp;nbsp; &amp;nbsp; return f(*a, **kw)
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
&gt; &amp;nbsp; &amp;nbsp; format(target_id, ".", name), value)
&gt; py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
&gt; : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.
&gt; 
&gt; 
&gt; Table options are:
&gt; 
&gt; 
&gt; 'connector'='jdbc'
&gt; 'driver'='com.mysql.cj.jdbc.Driver'
&gt; 'password'='pms_etl_q'
&gt; 'table-name'='enum_test'
&gt; 'url'='jdbc:mysql://*******:3306/test'
&gt; 'username'='pms_etl'
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 31 more
&gt; Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
&gt; 
&gt; 
&gt; Available factory identifiers are:
&gt; 
&gt; 
&gt; blackhole
&gt; datagen
&gt; filesystem
&gt; print
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 33 more
&gt; 
&gt; 
&gt; 
&gt; 我用flink run -py demo.py 返回错误如下:
&gt; 
&gt; 
&gt; &amp;nbsp; File "./demo.py", line 41, in <module&amp;gt;
&gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
&gt; pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; 
&gt; 
&gt; org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
&gt; Caused by: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 13 more
&gt; 
&gt; 
&gt; 
&gt; 请问我该如何解决?

回复: Pyflink jdbc相关

Posted by 琴师 <11...@qq.com>.
感谢,我换成2.11确实可以了!!!!


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <dian0511.fu@gmail.com&gt;;
发送时间:&nbsp;2021年6月1日(星期二) 下午5:04
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"琴师"<1129656513@qq.com&gt;;

主题:&nbsp;Re: Pyflink jdbc相关



Hi,

本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的


flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。


&gt; 2021年6月1日 下午4:33,琴师 <1129656513@qq.com&gt; 写道:
&gt; 
&gt; Hi,
&gt; &amp;nbsp; &amp;nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
&gt; 我的原代码如下:
&gt; 
&gt; 
&gt; from pyflink.datastream import StreamExecutionEnvironment
&gt; from pyflink.table import StreamTableEnvironment, EnvironmentSettings
&gt; env = StreamExecutionEnvironment.get_execution_environment()
&gt; table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
&gt; table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
&gt; 
&gt; 
&gt; # 2. create source Table
&gt; table_env.execute_sql("""
&gt; 
&gt; 
&gt; CREATE TABLE table_source (
&gt; &amp;nbsp; e string
&gt; ) WITH (
&gt; &amp;nbsp;'connector' = 'jdbc',
&gt; &amp;nbsp; 'url' = 'jdbc:mysql://********:3306/test',
&gt; &amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt; &amp;nbsp; 'table-name' = 'enum_test',
&gt; &amp;nbsp; 'username' = 'pms_etl',
&gt; &amp;nbsp; 'password' = 'pms_etl_q'
&gt; )
&gt; 
&gt; 
&gt; """)
&gt; 
&gt; 
&gt; # 3. create sink Table
&gt; table_env.execute_sql("""
&gt; &amp;nbsp; &amp;nbsp; CREATE TABLE print (
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e string
&gt; &amp;nbsp; &amp;nbsp; ) WITH (
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 'connector' = 'print'
&gt; &amp;nbsp; &amp;nbsp; )
&gt; """)
&gt; &amp;nbsp; &amp;nbsp;
&gt; 
&gt; 
&gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; 
&gt; 
&gt; 
&gt; 我直接用python执行时候错误返回如下
&gt; 
&gt; 
&gt; Traceback (most recent call last):
&gt; &amp;nbsp; File "demo.py", line 41, in <module&amp;gt;
&gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;nbsp; &amp;nbsp; return TableResult(self._j_tenv.executeSql(stmt))
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;nbsp; &amp;nbsp; answer, self.gateway_client, self.target_id, self.name)
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
&gt; &amp;nbsp; &amp;nbsp; return f(*a, **kw)
&gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
&gt; &amp;nbsp; &amp;nbsp; format(target_id, ".", name), value)
&gt; py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
&gt; : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.
&gt; 
&gt; 
&gt; Table options are:
&gt; 
&gt; 
&gt; 'connector'='jdbc'
&gt; 'driver'='com.mysql.cj.jdbc.Driver'
&gt; 'password'='pms_etl_q'
&gt; 'table-name'='enum_test'
&gt; 'url'='jdbc:mysql://*******:3306/test'
&gt; 'username'='pms_etl'
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 31 more
&gt; Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
&gt; 
&gt; 
&gt; Available factory identifiers are:
&gt; 
&gt; 
&gt; blackhole
&gt; datagen
&gt; filesystem
&gt; print
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 33 more
&gt; 
&gt; 
&gt; 
&gt; 我用flink run -py demo.py 返回错误如下:
&gt; 
&gt; 
&gt; &amp;nbsp; File "./demo.py", line 41, in <module&amp;gt;
&gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
&gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
&gt; &amp;nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
&gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
&gt; pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748)
&gt; 
&gt; 
&gt; org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
&gt; Caused by: java.lang.RuntimeException: Python process exits with code: 1
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 13 more
&gt; 
&gt; 
&gt; 
&gt; 请问我该如何解决?

Re: Pyflink jdbc相关

Posted by Dian Fu <di...@gmail.com>.
Hi,

本地执行:
1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的


flink run:
1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。


> 2021年6月1日 下午4:33,琴师 <11...@qq.com> 写道:
> 
> Hi,
> &nbsp; &nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1
> 我的原代码如下:
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
> table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
> 
> 
> # 2. create source Table
> table_env.execute_sql("""
> 
> 
> CREATE TABLE table_source (
> &nbsp; e string
> ) WITH (
> &nbsp;'connector' = 'jdbc',
> &nbsp; 'url' = 'jdbc:mysql://********:3306/test',
> &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
> &nbsp; 'table-name' = 'enum_test',
> &nbsp; 'username' = 'pms_etl',
> &nbsp; 'password' = 'pms_etl_q'
> )
> 
> 
> """)
> 
> 
> # 3. create sink Table
> table_env.execute_sql("""
> &nbsp; &nbsp; CREATE TABLE print (
> &nbsp; &nbsp; &nbsp; &nbsp; e string
> &nbsp; &nbsp; ) WITH (
> &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print'
> &nbsp; &nbsp; )
> """)
> &nbsp; &nbsp;
> 
> 
> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> 
> 
> 
> 我直接用python执行时候错误返回如下
> 
> 
> Traceback (most recent call last):
> &nbsp; File "demo.py", line 41, in <module&gt;
> &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql
> &nbsp; &nbsp; return TableResult(self._j_tenv.executeSql(stmt))
> &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
> &nbsp; &nbsp; answer, self.gateway_client, self.target_id, self.name)
> &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
> &nbsp; &nbsp; return f(*a, **kw)
> &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
> &nbsp; &nbsp; format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'driver'='com.mysql.cj.jdbc.Driver'
> 'password'='pms_etl_q'
> 'table-name'='enum_test'
> 'url'='jdbc:mysql://*******:3306/test'
> 'username'='pms_etl'
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 31 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
> 
> 
> Available factory identifiers are:
> 
> 
> blackhole
> datagen
> filesystem
> print
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 33 more
> 
> 
> 
> 我用flink run -py demo.py 返回错误如下:
> 
> 
> &nbsp; File "./demo.py", line 41, in <module&gt;
> &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()
> &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql
> &nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
> &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
> pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)
> 
> 
> org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
> &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
> &nbsp; &nbsp; &nbsp; &nbsp; ... 13 more
> 
> 
> 
> 请问我该如何解决?