You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 越张 <zh...@silvrr.com> on 2019/09/10 04:31:13 UTC
Flink1.9 sql 提交失败
代码:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
tableEnv.connect( new Kafka()
.version("0.10")
.topic("installmentdb_t_user")
.startFromEarliest()
.property("zookeeper.connect", "risk-kafka.aku:2181")
.property("bootstrap.servers", "risk-kafka.aku:9092"))
.withFormat(new Json().deriveSchema())
.withSchema(new Schema()
.field("business", Types.STRING)
.field("type", Types.STRING)
.field("es", Types.LONG)
)
.inAppendMode().registerTableSource("installmentdb_t_user");
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at feature.flinktask.sqltest.main(sqltest.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 12 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=risk-kafka.aku:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=risk-kafka.aku:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=installmentdb_t_user
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name=business
schema.0.type=VARCHAR
schema.1.name=type
schema.1.type=VARCHAR
schema.2.name=es
schema.2.type=BIGINT
update-mode=append
The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.formats.json.JsonRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
... 20 more
Re: Flink1.9 sql 提交失败
Posted by 越张 <zh...@silvrr.com>.
是的,是这个问题,发现包打在胖包里面了,但是找不到,把包放在flink lib 目录下就好了,很奇怪
> 在 2019年9月11日,上午9:35,Dian Fu <di...@gmail.com> 写道:
>
> 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
>
>
>> 在 2019年9月10日,下午12:31,越张 <zh...@silvrr.com> 写道:
>>
>> 代码:
>> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
>>
>> tableEnv.connect( new Kafka()
>> .version("0.10")
>> .topic("installmentdb_t_user")
>> .startFromEarliest()
>> .property("zookeeper.connect", "risk-kafka.aku:2181")
>> .property("bootstrap.servers", "risk-kafka.aku:9092"))
>> .withFormat(new Json().deriveSchema())
>> .withSchema(new Schema()
>> .field("business", Types.STRING)
>> .field("type", Types.STRING)
>> .field("es", Types.LONG)
>> )
>> .inAppendMode().registerTableSource("installmentdb_t_user");
>>
>>
>>
>>
>> Starting execution of program
>>
>> ------------------------------------------------------------
>> The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>> at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>> at feature.flinktask.sqltest.main(sqltest.java:39)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> ... 12 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>> Reason: No context matches.
>>
>> The following properties are requested:
>> connector.properties.0.key=zookeeper.connect
>> connector.properties.0.value=risk-kafka.aku:2181
>> connector.properties.1.key=bootstrap.servers
>> connector.properties.1.value=risk-kafka.aku:9092
>> connector.property-version=1
>> connector.startup-mode=earliest-offset
>> connector.topic=installmentdb_t_user
>> connector.type=kafka
>> connector.version=0.10
>> format.derive-schema=true
>> format.property-version=1
>> format.type=json
>> schema.0.name=business
>> schema.0.type=VARCHAR
>> schema.1.name=type
>> schema.1.type=VARCHAR
>> schema.2.name=es
>> schema.2.type=BIGINT
>> update-mode=append
>>
>> The following factories have been considered:
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> org.apache.flink.table.planner.StreamPlannerFactory
>> org.apache.flink.table.executor.StreamExecutorFactory
>> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
>> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>> org.apache.flink.formats.json.JsonRowFormatFactory
>> at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
>> ... 20 more
>
Re: Flink1.9 sql 提交失败
Posted by Dian Fu <di...@gmail.com>.
看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
> 在 2019年9月10日,下午12:31,越张 <zh...@silvrr.com> 写道:
>
> 代码:
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
>
> tableEnv.connect( new Kafka()
> .version("0.10")
> .topic("installmentdb_t_user")
> .startFromEarliest()
> .property("zookeeper.connect", "risk-kafka.aku:2181")
> .property("bootstrap.servers", "risk-kafka.aku:9092"))
> .withFormat(new Json().deriveSchema())
> .withSchema(new Schema()
> .field("business", Types.STRING)
> .field("type", Types.STRING)
> .field("es", Types.LONG)
> )
> .inAppendMode().registerTableSource("installmentdb_t_user");
>
>
>
>
> Starting execution of program
>
> ------------------------------------------------------------
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
> at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
> at feature.flinktask.sqltest.main(sqltest.java:39)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> ... 12 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No context matches.
>
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=risk-kafka.aku:2181
> connector.properties.1.key=bootstrap.servers
> connector.properties.1.value=risk-kafka.aku:9092
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=installmentdb_t_user
> connector.type=kafka
> connector.version=0.10
> format.derive-schema=true
> format.property-version=1
> format.type=json
> schema.0.name=business
> schema.0.type=VARCHAR
> schema.1.name=type
> schema.1.type=VARCHAR
> schema.2.name=es
> schema.2.type=BIGINT
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> org.apache.flink.formats.json.JsonRowFormatFactory
> at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
> at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
> at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
> at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
> ... 20 more