You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 奔跑的小飞袁 <s_...@126.com> on 2020/10/14 08:55:19 UTC

flinksql引入flink-parquet_2.11任务提交失败

hello,
我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a source for reading table
'default_catalog.default_database.cloud_behavior_source'.

Table options are:

'connector'='kafka'
'format'='avro'
'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
'properties.group.id'='testGroup'
'scan.startup.mode'='earliest-offset'
'topic'='cloud_behavior'
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
	at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
	at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
	at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
	at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.cloud_behavior_source'.

Table options are:

'connector'='kafka'
'format'='avro'
'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
'properties.group.id'='testGroup'
'scan.startup.mode'='earliest-offset'
'topic'='cloud_behavior'
	at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
	at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
	at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
	at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
	at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
	at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
	at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
	at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
	at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
	at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
	at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
	at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
	at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
	at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
	at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
	at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
	at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
	... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='kafka''.
	at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
	at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
	... 39 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

Available factory identifiers are:

datagen
	at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
	at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
	... 40 more

我的sql配置
SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

-- Kafka cdbp zdao source 表
CREATE TABLE cloud_behavior_source(
    。。。
) WITH (
    'connector'='kafka',
    'topic'='cloud_behavior',
    'properties.bootstrap.servers'='',
    'properties.group.id'='testGroup',
    'format'='avro',
    'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
CREATE TABLE cloud_behavior_sink(
    。。。
)PARTITIONED BY(operation)  WITH (
    'connector'='filesystem',
    'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
    'format'='parquet'
);

-- 业务过程
INSERT INTO cloud_behavior_sink
SELECT
     *
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql引入flink-parquet_2.11任务提交失败

Posted by zhuxiaoshang <zh...@gmail.com>.
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

看着像是缺少kafka-connector的依赖


> 2020年10月14日 下午4:55,奔跑的小飞袁 <s_...@126.com> 写道:
> 
> hello,
> 我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Unable to create a source for reading table
> 'default_catalog.default_database.cloud_behavior_source'.
> 
> Table options are:
> 
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='cloud_behavior'
> 	at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> 	at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> 	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> 	at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> 	at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> 	at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> 	at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create
> a source for reading table
> 'default_catalog.default_database.cloud_behavior_source'.
> 
> Table options are:
> 
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='cloud_behavior'
> 	at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> 	at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> 	at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> 	at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> 	at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> 	at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> 	at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
> 	at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
> 	at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
> 	at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
> 	at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
> 	at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> 	at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> 	at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
> 	at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
> 	at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
> 	at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:28)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> 	... 11 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
> connector using option ''connector'='kafka''.
> 	at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> 	at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> 	... 39 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> 
> Available factory identifiers are:
> 
> datagen
> 	at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> 	at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> 	... 40 more
> 
> 我的sql配置
> SET stream.enableCheckpointing=1000*60;
> SET stream.setParallelism=4;
> 
> -- Kafka cdbp zdao source 表
> CREATE TABLE cloud_behavior_source(
>    。。。
> ) WITH (
>    'connector'='kafka',
>    'topic'='cloud_behavior',
>    'properties.bootstrap.servers'='',
>    'properties.group.id'='testGroup',
>    'format'='avro',
>    'scan.startup.mode'='earliest-offset'
> );
> 
> -- Hbase zdao uv 统计 Sink 表
> CREATE TABLE cloud_behavior_sink(
>    。。。
> )PARTITIONED BY(operation)  WITH (
>    'connector'='filesystem',
>    'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
>    'format'='parquet'
> );
> 
> -- 业务过程
> INSERT INTO cloud_behavior_sink
> SELECT
>     *
> FROM cloud_behavior_source;
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql引入flink-parquet_2.11任务提交失败

Posted by yujianbo <15...@163.com>.
大佬后面你是怎么解决的,我也是突然遇到这个问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/