You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by James McGuire via user <us...@flink.apache.org> on 2022/10/04 23:20:59 UTC

ClassNotFoundException when loading protobuf message class in Flink SQL

Hi Flink Community,
I am trying to prove out the new protobuf functionality added to 1.16
([1]).  I have built master locally and have attempted following the
Protobuf Format doc ([2]) to create a table with the kafka connector using
the protobuf format.

I compiled the sample .proto file using protoc version 3.2.0, compiled the
.java output files using javac, linking to protobuf-java-3.5.1.jar (using
earlier versions gives me compiler errors about UnusedPrivateParameter) and
packaged the resulting class files into SimpleTest.jar.

However, when I try to select the table, I get the following error:
% ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
--jar
~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
--jar
~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
Flink SQL> CREATE TABLE simple_test (
>   uid BIGINT,
>   name STRING,
>   category_type INT,
>   content BINARY,
>   price DOUBLE,
>   value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
>   value_arr array<row<v1 BIGINT, v2 INT>>,
>   corpus_int INT,
>   corpus_str STRING
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'protobuf',
>  'protobuf.message-class-name' = 'com.example.SimpleTest',
>  'protobuf.ignore-parse-errors' = 'true'
> )
> ;
[INFO] Execute statement succeed.

Flink SQL> select * from simple_test;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.example.SimpleTest

Flink SQL>

Any advice greatly appreciated, thank you.

[1]
https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/

Re: ClassNotFoundException when loading protobuf message class in Flink SQL

Posted by James McGuire via user <us...@flink.apache.org>.
Thanks for the tip, I will use the flink-sql-connector-kafka jar instead.

I was able to get it to work by moving the SimpleTest.jar inside the lib
folder.  I am not sure why this worked but passing in the jar with the
--jar flag did not work.

Thanks,
James

On Tue, Oct 4, 2022 at 7:32 PM Benchao Li <li...@apache.org> wrote:

> Hi James,
>
> Your steps seem right. Could you check your jar file
> '~/repos/simple_protobuf/SimpleTest/SimpleTest.jar'
> that it does contain 'com.example.SimpleTest.class'?
>
> Besides that, to use Kafka connector in sql-client, you should use
> 'flink-sql-connector-kafka' instead of
> 'flink-connector-kafka'.
>
>
> James McGuire via user <us...@flink.apache.org> 于2022年10月5日周三 07:21写道:
>
>> Hi Flink Community,
>> I am trying to prove out the new protobuf functionality added to 1.16
>> ([1]).  I have built master locally and have attempted following the
>> Protobuf Format doc ([2]) to create a table with the kafka connector using
>> the protobuf format.
>>
>> I compiled the sample .proto file using protoc version 3.2.0, compiled
>> the .java output files using javac, linking to protobuf-java-3.5.1.jar
>> (using earlier versions gives me compiler errors
>> about UnusedPrivateParameter) and packaged the resulting class files into
>> SimpleTest.jar.
>>
>> However, when I try to select the table, I get the following error:
>> % ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
>> --jar
>> ~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
>> --jar
>> ~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
>> Flink SQL> CREATE TABLE simple_test (
>> >   uid BIGINT,
>> >   name STRING,
>> >   category_type INT,
>> >   content BINARY,
>> >   price DOUBLE,
>> >   value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
>> >   value_arr array<row<v1 BIGINT, v2 INT>>,
>> >   corpus_int INT,
>> >   corpus_str STRING
>> > ) WITH (
>> >  'connector' = 'kafka',
>> >  'topic' = 'user_behavior',
>> >  'properties.bootstrap.servers' = 'localhost:9092',
>> >  'properties.group.id
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__properties.group.id_&d=DwMFaQ&c=jNOZMG6z2EeG3kPzO0VzH_g0oOA2ndrpb3OIHbfUmfw&r=WMNsvPRa7hNbijq0l-8UVFzrwbjeb_1e7OYXc1paGgg&m=VJwoa3dBA59pBLPZVVDB16F0qtUr-pKCtVXxsUFDBPr80eD4-RACeAXvZjnqXUgG&s=pVaG5Prq-fk9OtvzQaDyJfjgHpDJ6xbHGeQjPwNuc0U&e=>'
>> = 'testGroup',
>> >  'format' = 'protobuf',
>> >  'protobuf.message-class-name' = 'com.example.SimpleTest',
>> >  'protobuf.ignore-parse-errors' = 'true'
>> > )
>> > ;
>> [INFO] Execute statement succeed.
>>
>> Flink SQL> select * from simple_test;
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.ClassNotFoundException: com.example.SimpleTest
>>
>> Flink SQL>
>>
>> Any advice greatly appreciated, thank you.
>>
>> [1]
>> https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_commit_5c87b69b5300e8678629aa8b769d60ec2fdbf3d1&d=DwMFaQ&c=jNOZMG6z2EeG3kPzO0VzH_g0oOA2ndrpb3OIHbfUmfw&r=WMNsvPRa7hNbijq0l-8UVFzrwbjeb_1e7OYXc1paGgg&m=VJwoa3dBA59pBLPZVVDB16F0qtUr-pKCtVXxsUFDBPr80eD4-RACeAXvZjnqXUgG&s=DUcuXMTwzNITDilAwgunk90YYQZwPoipV2XEa_WXIo8&e=>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_formats_protobuf_&d=DwMFaQ&c=jNOZMG6z2EeG3kPzO0VzH_g0oOA2ndrpb3OIHbfUmfw&r=WMNsvPRa7hNbijq0l-8UVFzrwbjeb_1e7OYXc1paGgg&m=VJwoa3dBA59pBLPZVVDB16F0qtUr-pKCtVXxsUFDBPr80eD4-RACeAXvZjnqXUgG&s=marvNbu8RF2g8YLf4iFJyxRfjq3PXrgyxCKGngjVU3s&e=>
>>
>
>
> --
>
> Best,
> Benchao Li
>


-- 

*James McGuire*
Staff Software Engineer
  |  james.mcguire@procore.com <%7BmainEmail%7D>  |  procore.com
<http://www.procore.com/>

*Discover the tools that are driving results for construction leaders
across the globe.* Download the 2022 ROI Report.
<http://www.procore.com/ebooks/roi>

[image: Procore]
LinkedIn <https://www.linkedin.com/company/procore-technologies>  |
Facebook <https://www.facebook.com/procore.tech>  |  Instagram
<https://www.instagram.com/procoretech>  |  Twitter
<https://www.twitter.com/procoretech>  |  YouTube
<https://www.youtube.com/user/ProcoreVideo>
This email is intended only for the person(s) named in the message header
and, unless otherwise indicated, contains confidential and/or privileged
information. If you have received this message in error, please notify the
sender of the error and delete the message.

Re: ClassNotFoundException when loading protobuf message class in Flink SQL

Posted by Benchao Li <li...@apache.org>.
Hi James,

Your steps seem right. Could you check your jar file
'~/repos/simple_protobuf/SimpleTest/SimpleTest.jar'
that it does contain 'com.example.SimpleTest.class'?

Besides that, to use Kafka connector in sql-client, you should use
'flink-sql-connector-kafka' instead of
'flink-connector-kafka'.


James McGuire via user <us...@flink.apache.org> 于2022年10月5日周三 07:21写道:

> Hi Flink Community,
> I am trying to prove out the new protobuf functionality added to 1.16
> ([1]).  I have built master locally and have attempted following the
> Protobuf Format doc ([2]) to create a table with the kafka connector using
> the protobuf format.
>
> I compiled the sample .proto file using protoc version 3.2.0, compiled the
> .java output files using javac, linking to protobuf-java-3.5.1.jar (using
> earlier versions gives me compiler errors about UnusedPrivateParameter) and
> packaged the resulting class files into SimpleTest.jar.
>
> However, when I try to select the table, I get the following error:
> % ./sql-client.sh --jar ~/repos/simple_protobuf/SimpleTest/SimpleTest.jar
> --jar
> ~/repos/flink/flink-connectors/flink-connector-kafka/target/flink-connector-kafka-1.17-SNAPSHOT.jar
> --jar
> ~/repos/flink/flink-formats/flink-sql-protobuf/target/flink-sql-protobuf-1.17-SNAPSHOT.jar
> Flink SQL> CREATE TABLE simple_test (
> >   uid BIGINT,
> >   name STRING,
> >   category_type INT,
> >   content BINARY,
> >   price DOUBLE,
> >   value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
> >   value_arr array<row<v1 BIGINT, v2 INT>>,
> >   corpus_int INT,
> >   corpus_str STRING
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'user_behavior',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'properties.group.id' = 'testGroup',
> >  'format' = 'protobuf',
> >  'protobuf.message-class-name' = 'com.example.SimpleTest',
> >  'protobuf.ignore-parse-errors' = 'true'
> > )
> > ;
> [INFO] Execute statement succeed.
>
> Flink SQL> select * from simple_test;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: com.example.SimpleTest
>
> Flink SQL>
>
> Any advice greatly appreciated, thank you.
>
> [1]
> https://github.com/apache/flink/commit/5c87b69b5300e8678629aa8b769d60ec2fdbf3d1
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
>


-- 

Best,
Benchao Li