You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by RS <ti...@163.com> on 2020/07/24 09:02:04 UTC

Could not find any factory for identifier 'kafka'

hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
    public String ddlSql = String.format("CREATE TABLE %s (\n" +
            "  number BIGINT,\n" +
            "  msg STRING,\n" +
            "  username STRING,\n" +
            "  update_time TIMESTAMP(3)\n" +
            ") WITH (\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = '%s',\n" +
            " 'properties.bootstrap.servers' = '%s',\n" +
            " 'properties.group.id' = '%s',\n" +
            " 'format' = 'json',\n" +
            " 'json.fail-on-missing-field' = 'false',\n" +
            " 'json.ignore-parse-errors' = 'true'\n" +
            ")\n", tableName, topic, servers, group);


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>


感谢各位~

Re:Re: Re: Could not find any factory for identifier 'kafka'

Posted by RS <ti...@163.com>.
Hi,
1. 好的,学习了
2. 确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了




在 2020-07-27 11:42:50,"Caizhi Weng" <ts...@gmail.com> 写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS <ti...@163.com> 于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
>> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>>
>>
>> 在 2020-07-24 20:16:18,"JasonLee" <17...@163.com> 写道:
>> >hi
>> >只需要-sql和-json两个包就可以了
>> >
>> >
>> >| |
>> >JasonLee
>> >|
>> >|
>> >邮箱:17610775726@163.com
>> >|
>> >
>> >Signature is customized by Netease Mail Master
>> >
>> >On 07/24/2020 17:02, RS wrote:
>> >hi,
>> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> >编译的jar包是jar-with-dependencies的
>> >
>> >
>> >代码片段:
>> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> >           "  number BIGINT,\n" +
>> >           "  msg STRING,\n" +
>> >           "  username STRING,\n" +
>> >           "  update_time TIMESTAMP(3)\n" +
>> >           ") WITH (\n" +
>> >           " 'connector' = 'kafka',\n" +
>> >           " 'topic' = '%s',\n" +
>> >           " 'properties.bootstrap.servers' = '%s',\n" +
>> >           " 'properties.group.id' = '%s',\n" +
>> >           " 'format' = 'json',\n" +
>> >           " 'json.fail-on-missing-field' = 'false',\n" +
>> >           " 'json.ignore-parse-errors' = 'true'\n" +
>> >           ")\n", tableName, topic, servers, group);
>> >
>> >
>> >       StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >       StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> >       tableEnv.executeSql(ddlSql);
>> >
>> >
>> >报错信息:
>> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> >Available factory identifiers are:
>> >datagen
>> >at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >... 33 more
>> >
>> >
>> >参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>> >
>> >
>> >附上pom依赖:
>> ><dependencies>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-java</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-table-api-java</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-connector-kafka_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-json</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >   </dependencies>
>> >
>> >
>> >感谢各位~
>>

Re: Re: Could not find any factory for identifier 'kafka'

Posted by Caizhi Weng <ts...@gmail.com>.
Hi,

Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
是否能把这些资源文件打进去。

另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
的用户 jar 的话,并不需要把 Flink 的依赖也放进去。

RS <ti...@163.com> 于2020年7月24日周五 下午8:30写道:

> hi,
> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>
>
> 我项目中新增目录:resources/META-INF/services
> 然后从Flink源码中复制了2个文件
> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>
>
> 在 2020-07-24 20:16:18,"JasonLee" <17...@163.com> 写道:
> >hi
> >只需要-sql和-json两个包就可以了
> >
> >
> >| |
> >JasonLee
> >|
> >|
> >邮箱:17610775726@163.com
> >|
> >
> >Signature is customized by Netease Mail Master
> >
> >On 07/24/2020 17:02, RS wrote:
> >hi,
> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> >编译的jar包是jar-with-dependencies的
> >
> >
> >代码片段:
> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
> >           "  number BIGINT,\n" +
> >           "  msg STRING,\n" +
> >           "  username STRING,\n" +
> >           "  update_time TIMESTAMP(3)\n" +
> >           ") WITH (\n" +
> >           " 'connector' = 'kafka',\n" +
> >           " 'topic' = '%s',\n" +
> >           " 'properties.bootstrap.servers' = '%s',\n" +
> >           " 'properties.group.id' = '%s',\n" +
> >           " 'format' = 'json',\n" +
> >           " 'json.fail-on-missing-field' = 'false',\n" +
> >           " 'json.ignore-parse-errors' = 'true'\n" +
> >           ")\n", tableName, topic, servers, group);
> >
> >
> >       StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >       StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> >       tableEnv.executeSql(ddlSql);
> >
> >
> >报错信息:
> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> >Available factory identifiers are:
> >datagen
> >at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> >at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> >... 33 more
> >
> >
> >参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
> >
> >
> >附上pom依赖:
> ><dependencies>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-java</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-table-api-java</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-connector-kafka_2.12</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >       <dependency>
> >           <groupId>org.apache.flink</groupId>
> >           <artifactId>flink-json</artifactId>
> >           <version>${flink.version}</version>
> >       </dependency>
> >   </dependencies>
> >
> >
> >感谢各位~
>

Re:Re: Could not find any factory for identifier 'kafka'

Posted by RS <ti...@163.com>.
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题


我项目中新增目录:resources/META-INF/services
然后从Flink源码中复制了2个文件 org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。


在 2020-07-24 20:16:18,"JasonLee" <17...@163.com> 写道:
>hi
>只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775726@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
>
>代码片段:
>   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>           "  number BIGINT,\n" +
>           "  msg STRING,\n" +
>           "  username STRING,\n" +
>           "  update_time TIMESTAMP(3)\n" +
>           ") WITH (\n" +
>           " 'connector' = 'kafka',\n" +
>           " 'topic' = '%s',\n" +
>           " 'properties.bootstrap.servers' = '%s',\n" +
>           " 'properties.group.id' = '%s',\n" +
>           " 'format' = 'json',\n" +
>           " 'json.fail-on-missing-field' = 'false',\n" +
>           " 'json.ignore-parse-errors' = 'true'\n" +
>           ")\n", tableName, topic, servers, group);
>
>
>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>       tableEnv.executeSql(ddlSql);
>
>
>报错信息:
>Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
>Available factory identifiers are:
>datagen
>at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>... 33 more
>
>
>参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
>附上pom依赖:
><dependencies>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-java</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-table-api-java</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-connector-kafka_2.12</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>       <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-json</artifactId>
>           <version>${flink.version}</version>
>       </dependency>
>   </dependencies>
>
>
>感谢各位~

Re: Could not find any factory for identifier 'kafka'

Posted by JasonLee <17...@163.com>.
hi
只需要-sql和-json两个包就可以了


| |
JasonLee
|
|
邮箱:17610775726@163.com
|

Signature is customized by Netease Mail Master

On 07/24/2020 17:02, RS wrote:
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
   public String ddlSql = String.format("CREATE TABLE %s (\n" +
           "  number BIGINT,\n" +
           "  msg STRING,\n" +
           "  username STRING,\n" +
           "  update_time TIMESTAMP(3)\n" +
           ") WITH (\n" +
           " 'connector' = 'kafka',\n" +
           " 'topic' = '%s',\n" +
           " 'properties.bootstrap.servers' = '%s',\n" +
           " 'properties.group.id' = '%s',\n" +
           " 'format' = 'json',\n" +
           " 'json.fail-on-missing-field' = 'false',\n" +
           " 'json.ignore-parse-errors' = 'true'\n" +
           ")\n", tableName, topic, servers, group);


       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
       tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:
<dependencies>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-java</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-json</artifactId>
           <version>${flink.version}</version>
       </dependency>
   </dependencies>


感谢各位~

Re: Could not find any factory for identifier 'kafka'

Posted by admin <17...@163.com>.
  <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>

这两个会有冲突,去掉上面那个

> 2020年7月24日 下午5:02,RS <ti...@163.com> 写道:
> 
>   <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-connector-kafka_2.12</artifactId>
>            <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.flink</groupId>
>            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>            <version>${flink.version}</version>
>        </dependency>


Re:Re:Re: Could not find any factory for identifier 'kafka'

Posted by RS <ti...@163.com>.
邮件格式不对,我重新回复下


我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。

> flink run xxx

没有使用shade-plugin

maven build参数:
    <properties>
        <jdk.version>1.8</jdk.version>
        <flink.version>1.11.1</flink.version>
    </properties>



    <build>

        <plugins>

            <plugin>

                <artifactId>maven-compiler-plugin</artifactId>

                <configuration>

                    <source>${jdk.version}</source>

                    <target>${jdk.version}</target>

                </configuration>

            </plugin>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-assembly-plugin</artifactId>

                <executions>

                    <execution>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                </configuration>

            </plugin>

        </plugins>

    </build>



Re:Re: Could not find any factory for identifier 'kafka'

Posted by RS <ti...@163.com>.
我这边是直接打成jar包扔到服务器上运行的(bin/flink run xxx),没有在IDEA运行过。<br/>maven编译没配置shade-plugin,maven build参数如下:<br/>    &lt;properties&gt;<br/>        &lt;jdk.version&gt;1.8&lt;/jdk.version&gt;<br/>        &lt;flink.version&gt;1.11.1&lt;/flink.version&gt;<br/>    &lt;/properties&gt;<br/>    &lt;build&gt;<br/>        &lt;plugins&gt;<br/>            &lt;plugin&gt;<br/>                &lt;artifactId&gt;maven-compiler-plugin&lt;/artifactId&gt;<br/>                &lt;configuration&gt;<br/>                    &lt;source&gt;${jdk.version}&lt;/source&gt;<br/>                    &lt;target&gt;${jdk.version}&lt;/target&gt;<br/>                &lt;/configuration&gt;<br/>            &lt;/plugin&gt;<br/>            &lt;plugin&gt;<br/>                &lt;groupId&gt;org.apache.maven.plugins&lt;/groupId&gt;<br/>                &lt;artifactId&gt;maven-assembly-plugin&lt;/artifactId&gt;<br/>                &lt;executions&gt;<br/>                    &lt;execution&gt;<br/>                        &lt;phase&gt;package&lt;/phase&gt;<br/>                        &lt;goals&gt;<br/>                            &lt;goal&gt;single&lt;/goal&gt;<br/>                        &lt;/goals&gt;<br/>                    &lt;/execution&gt;<br/>                &lt;/executions&gt;<br/>                &lt;configuration&gt;<br/>                    &lt;descriptorRefs&gt;<br/>                        &lt;descriptorRef&gt;jar-with-dependencies&lt;/descriptorRef&gt;<br/>                    &lt;/descriptorRefs&gt;<br/>                &lt;/configuration&gt;<br/>            &lt;/plugin&gt;<br/>        &lt;/plugins&gt;<br/>    &lt;/build&gt;<br/><br/>thx
在 2020-07-24 17:36:46,"Benchao Li" <li...@apache.org> 写道:
>可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS <ti...@163.com> 于2020年7月24日周五 下午5:02写道:
>
>> hi,
>> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> 编译的jar包是jar-with-dependencies的
>>
>>
>> 代码片段:
>>     public String ddlSql = String.format("CREATE TABLE %s (\n" +
>>             "  number BIGINT,\n" +
>>             "  msg STRING,\n" +
>>             "  username STRING,\n" +
>>             "  update_time TIMESTAMP(3)\n" +
>>             ") WITH (\n" +
>>             " 'connector' = 'kafka',\n" +
>>             " 'topic' = '%s',\n" +
>>             " 'properties.bootstrap.servers' = '%s',\n" +
>>             " 'properties.group.id' = '%s',\n" +
>>             " 'format' = 'json',\n" +
>>             " 'json.fail-on-missing-field' = 'false',\n" +
>>             " 'json.ignore-parse-errors' = 'true'\n" +
>>             ")\n", tableName, topic, servers, group);
>>
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>>         tableEnv.executeSql(ddlSql);
>>
>>
>> 报错信息:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> Available factory identifiers are:
>> datagen
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 33 more
>>
>>
>> 参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>>
>>
>> 附上pom依赖:
>> <dependencies>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-java</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-table-api-java</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka_2.12</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-json</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>     </dependencies>
>>
>>
>> 感谢各位~
>
>
>
>-- 
>
>Best,
>Benchao Li

Re: Could not find any factory for identifier 'kafka'

Posted by Benchao Li <li...@apache.org>.
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
如果你用的是shade plugin,需要看下这个transformer[1]

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer

RS <ti...@163.com> 于2020年7月24日周五 下午5:02写道:

> hi,
> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> 编译的jar包是jar-with-dependencies的
>
>
> 代码片段:
>     public String ddlSql = String.format("CREATE TABLE %s (\n" +
>             "  number BIGINT,\n" +
>             "  msg STRING,\n" +
>             "  username STRING,\n" +
>             "  update_time TIMESTAMP(3)\n" +
>             ") WITH (\n" +
>             " 'connector' = 'kafka',\n" +
>             " 'topic' = '%s',\n" +
>             " 'properties.bootstrap.servers' = '%s',\n" +
>             " 'properties.group.id' = '%s',\n" +
>             " 'format' = 'json',\n" +
>             " 'json.fail-on-missing-field' = 'false',\n" +
>             " 'json.ignore-parse-errors' = 'true'\n" +
>             ")\n", tableName, topic, servers, group);
>
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>         tableEnv.executeSql(ddlSql);
>
>
> 报错信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 33 more
>
>
> 参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
> 附上pom依赖:
> <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>     </dependencies>
>
>
> 感谢各位~



-- 

Best,
Benchao Li