You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lasse Nedergaard <la...@gmail.com> on 2023/03/15 10:34:42 UTC

Kafka sql with validation exception

Hi. 

I have a simple job creating a table from Kafka. It works perfect on my local machine but when I build a Uber jar and use the official Flink image I get a validation exception. 

Could not find any factory for identifier ‘Kafka’ that implements org.Apache.Flink.table.dynamicTableFactory in the class path. 

The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka. 

I can see on my local machine it calls discovery factory in flink-table-common but on my cluster it use flink-table-api-java-Uber. 

And the list of available identities doesn’t contain ‘Kafka’ and upsert-Kafka as It does on my local machine. 🤔

Anyone has a clue where I should look for the problem?

Med venlig hilsen / Best regards
Lasse Nedergaard


Re: Kafka sql with validation exception

Posted by Lasse Nedergaard <la...@gmail.com>.
I got it to work

  

Thanks for pointing me in the right direction.

  

I had some flink dependence that wasn’t set to provided and I removed sql-
connector-Kafka and that seems to fix the problem.

  

Thanks once again  

  

Med venlig hilsen / Best regards

Lasse Nedergaard

  

  

> Den 15. mar. 2023 kl. 15.21 skrev Lasse Nedergaard
> <la...@gmail.com>:  
>  
>

> Hi.
>
>  
>
>
> Thanks Shammon.
>
> You are right
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> are not in the file.
>
>  
>
>
> I’m use the shade plugin as described and the only difference from my other
> project are the nested project structure. I have “my project”/Flink/“my
> flink project”/src/main/Java/…
>
>  
>
>
> So if you have any ideas why it isn’t shared correct it will help a lot
>
>  
>  
>
>
> Med venlig hilsen / Best regards
>
> Lasse Nedergaard
>
>  
>
>
>  
>
>

>> Den 15. mar. 2023 kl. 13.43 skrev Hang Ruan <ru...@gmail.com>:  
>  
>
>

>> 

>>

>> Hi, Lasse,

>>

>>  
>
>>

>> I think you should make sure the situation as Shammon said.

>>

>>  
>
>>

>> Maybe you need to use the maven-shade-plugin like this to package, and make
sure files in `META-INF/services` are  merged together.

>>

>>  
>
>>

>>> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version>
<executions> <execution> <phase>package</phase> <goals> <goal>shade</goal>
</goals> <configuration> <transformers> <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers> </configuration> </execution> </executions> </plugin>
</plugins> </build>

>>

>>  
>
>>

>> Best,

>>

>> Hang

>>

>>  
>
>>

>> Shammon FY <[zjureel@gmail.com](mailto:zjureel@gmail.com)> 于2023年3月15日周三
19:21写道:  
>
>>

>>> Hi Lasse

>>>

>>>  
>
>>>

>>> I think you can first check whether there is a file `META-
INF/services/org.apache.flink.table.factories.Factory` in your uber jar and
there's
`org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
in the file. Flink would like to create table factory from that file.

>>>

>>> And then you can check whether your uber jar are in the classpath of flink
cluster

>>>

>>>  
>
>>>

>>> Best,

>>>

>>> Shammon FY

>>>

>>>  
>
>>>

>>>  
>
>>>

>>> On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard
<[lassenedergaardflink@gmail.com](mailto:lassenedergaardflink@gmail.com)>
wrote:  
>
>>>

>>>> Hi.  
>  
>  I have a simple job creating a table from Kafka. It works perfect on my
> local machine but when I build a Uber jar and use the official Flink image I
> get a validation exception.  
>  
>  Could not find any factory for identifier ‘Kafka’ that implements
> org.Apache.Flink.table.dynamicTableFactory in the class path.  
>  
>  The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka.  
>  
>  I can see on my local machine it calls discovery factory in flink-table-
> common but on my cluster it use flink-table-api-java-Uber.  
>  
>  And the list of available identities doesn’t contain ‘Kafka’ and upsert-
> Kafka as It does on my local machine. 🤔  
>  
>  Anyone has a clue where I should look for the problem?  
>  
>  Med venlig hilsen / Best regards  
>  Lasse Nedergaard  
>  
>


Re: Kafka sql with validation exception

Posted by Lasse Nedergaard <la...@gmail.com>.
Hi.

  

Thanks Shammon.

You are right
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory are
not in the file.

  

I’m use the shade plugin as described and the only difference from my other
project are the nested project structure. I have “my project”/Flink/“my flink
project”/src/main/Java/…

  

So if you have any ideas why it isn’t shared correct it will help a lot

  
  

Med venlig hilsen / Best regards

Lasse Nedergaard

  

  

> Den 15. mar. 2023 kl. 13.43 skrev Hang Ruan <ru...@gmail.com>:  
>  
>

> 
>
> Hi, Lasse,
>
>  
>
>
> I think you should make sure the situation as Shammon said.
>
>  
>
>
> Maybe you need to use the maven-shade-plugin like this to package, and make
> sure files in `META-INF/services` are  merged together.
>
>  
>
>

>> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version>
<executions> <execution> <phase>package</phase> <goals> <goal>shade</goal>
</goals> <configuration> <transformers> <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers> </configuration> </execution> </executions> </plugin>
</plugins> </build>

>
>  
>
>
> Best,
>
> Hang
>
>  
>
>
> Shammon FY <[zjureel@gmail.com](mailto:zjureel@gmail.com)> 于2023年3月15日周三
> 19:21写道:  
>
>

>> Hi Lasse

>>

>>  
>
>>

>> I think you can first check whether there is a file `META-
INF/services/org.apache.flink.table.factories.Factory` in your uber jar and
there's
`org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
in the file. Flink would like to create table factory from that file.

>>

>> And then you can check whether your uber jar are in the classpath of flink
cluster

>>

>>  
>
>>

>> Best,

>>

>> Shammon FY

>>

>>  
>
>>

>>  
>
>>

>> On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard
<[lassenedergaardflink@gmail.com](mailto:lassenedergaardflink@gmail.com)>
wrote:  
>
>>

>>> Hi.  
>  
>  I have a simple job creating a table from Kafka. It works perfect on my
> local machine but when I build a Uber jar and use the official Flink image I
> get a validation exception.  
>  
>  Could not find any factory for identifier ‘Kafka’ that implements
> org.Apache.Flink.table.dynamicTableFactory in the class path.  
>  
>  The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka.  
>  
>  I can see on my local machine it calls discovery factory in flink-table-
> common but on my cluster it use flink-table-api-java-Uber.  
>  
>  And the list of available identities doesn’t contain ‘Kafka’ and upsert-
> Kafka as It does on my local machine. 🤔  
>  
>  Anyone has a clue where I should look for the problem?  
>  
>  Med venlig hilsen / Best regards  
>  Lasse Nedergaard  
>  
>


Re: Kafka sql with validation exception

Posted by Hang Ruan <ru...@gmail.com>.
Hi, Lasse,

I think you should make sure the situation as Shammon said.

Maybe you need to use the maven-shade-plugin like this to package, and make
sure files in `META-INF/services` are  merged together.

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <
> artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <
> executions> <execution> <phase>package</phase> <goals> <goal>shade</goal>
> </goals> <configuration> <transformers> <transformer implementation=
> "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </
> transformers> </configuration> </execution> </executions> </plugin> </
> plugins> </build>


Best,
Hang

Shammon FY <zj...@gmail.com> 于2023年3月15日周三 19:21写道:

> Hi Lasse
>
> I think you can first check whether there is a file
> `META-INF/services/org.apache.flink.table.factories.Factory` in your uber
> jar and there's
> `org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
> in the file. Flink would like to create table factory from that file.
> And then you can check whether your uber jar are in the classpath of flink
> cluster
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard <
> lassenedergaardflink@gmail.com> wrote:
>
>> Hi.
>>
>> I have a simple job creating a table from Kafka. It works perfect on my
>> local machine but when I build a Uber jar and use the official Flink image
>> I get a validation exception.
>>
>> Could not find any factory for identifier ‘Kafka’ that implements
>> org.Apache.Flink.table.dynamicTableFactory in the class path.
>>
>> The uber jar contains Flink-connector-kafka and
>> Flink-sql-connector-Kafka.
>>
>> I can see on my local machine it calls discovery factory in
>> flink-table-common but on my cluster it use flink-table-api-java-Uber.
>>
>> And the list of available identities doesn’t contain ‘Kafka’ and
>> upsert-Kafka as It does on my local machine. 🤔
>>
>> Anyone has a clue where I should look for the problem?
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>

Re: Kafka sql with validation exception

Posted by Shammon FY <zj...@gmail.com>.
Hi Lasse

I think you can first check whether there is a file
`META-INF/services/org.apache.flink.table.factories.Factory` in your uber
jar and there's
`org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
in the file. Flink would like to create table factory from that file.
And then you can check whether your uber jar are in the classpath of flink
cluster

Best,
Shammon FY


On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard <
lassenedergaardflink@gmail.com> wrote:

> Hi.
>
> I have a simple job creating a table from Kafka. It works perfect on my
> local machine but when I build a Uber jar and use the official Flink image
> I get a validation exception.
>
> Could not find any factory for identifier ‘Kafka’ that implements
> org.Apache.Flink.table.dynamicTableFactory in the class path.
>
> The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka.
>
> I can see on my local machine it calls discovery factory in
> flink-table-common but on my cluster it use flink-table-api-java-Uber.
>
> And the list of available identities doesn’t contain ‘Kafka’ and
> upsert-Kafka as It does on my local machine. 🤔
>
> Anyone has a clue where I should look for the problem?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>