You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tomoyuki NAKAMURA (Jira)" <ji...@apache.org> on 2022/12/29 13:30:00 UTC

[jira] [Comment Edited] (FLINK-30093) [Flink SQL][Protobuf] CompileException when querying Kafka topic using google.protobuf.Timestamp

    [ https://issues.apache.org/jira/browse/FLINK-30093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17652848#comment-17652848 ] 

Tomoyuki NAKAMURA edited comment on FLINK-30093 at 12/29/22 1:29 PM:
---------------------------------------------------------------------

I am not sure if 1.16 can handle the google.protobuf.Timestamp type. Since I am still using an early implementation of this Protobuf formatter.
If it is recommended to set multiple_files and define an outer class, it would help a lot of people if there is a description in the documentation.

It would also be better if a unit test of type Timestamp could be added. It is possible that the google.protobuf.Timestamp type may no longer be handled since the implementation seems to have been changed from earlier versions.


was (Author: laughingman7743):
I am not sure if 1.16 can handle the google.protobuf..Timestamp type. Since I am still using an early implementation of this Protobuf formatter.
If it is recommended to set multiple_files and define an outer class, it would help a lot of people if there is a description in the documentation.

It would also be better if a unit test of type Timestamp could be added. It is possible that the google.protobuf.Timestamp type may no longer be handled since the implementation seems to have been changed from earlier versions.

> [Flink SQL][Protobuf] CompileException when querying Kafka topic using google.protobuf.Timestamp 
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30093
>                 URL: https://issues.apache.org/jira/browse/FLINK-30093
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.16.0
>         Environment: Mac OS Ventura
>            Reporter: James Mcguire
>            Assignee: hubert dulay
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: taskmanager_172.22.0 (1).4_46291-40eec2_log
>
>
> I am encountering an issue when trying to use Flink SQL to query a Kafka topic that uses {{{}google.protobuf.Timestamp{}}}.
>  
> When attempting to use Flink SQL to query a protobuf serialized Kafka topic that uses  {{{}google.protobuf.Timestamp{}}}, a {{org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com" }}error occurs when trying to query the table.
>  
> *Replication steps:*
> 1. Use a protobuf definition that contains a {{{}google.protobuf.Timestamp{}}}:
> {noformat}
> syntax = "proto3";
> package example.message;
> import "google/protobuf/timestamp.proto";
> option java_package = "com.example.message";
> option java_multiple_files = true;
> message Test {
>   int64 id = 1;
>   google.protobuf.Timestamp created_at = 5;
> }{noformat}
> 2. Use protobuf definition to produce message to topic
> 3. Confirm message is deserializable by protoc:
> {code:java}
> kcat -C -t development.example.message -b localhost:9092 -o -1 -e -q -D "" | protoc --decode=example.message.Test --proto_path=/Users/jamesmcguire/repos/flink-proto-example/schemas/ example/message/test.proto 
> id: 123
> created_at {
>   seconds: 456
>   nanos: 789
> }{code}
> 4. Create table in Flink SQL using kafka connector and protobuf format
> {code:java}
> CREATE TABLE tests (
>   id BIGINT,
>   created_at row<seconds BIGINT, nanos INT>
> )
> COMMENT ''
> WITH (
>   'connector' = 'kafka',
>   'format' = 'protobuf',
>   'protobuf.message-class-name' = 'com.example.message.Test',
>   'properties.auto.offset.reset' = 'earliest',
>   'properties.bootstrap.servers' = 'host.docker.internal:9092',
>   'properties.group.id' = 'test-1',
>   'topic' = 'development.example.message'
> );{code}
> 5. Run query in Flink SQL and encounter error:
> {code:java}
> Flink SQL> select * from tests;
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com" {code}
> {*}NOTE{*}: If you repeat steps 4-5 without {{created_at row<seconds BIGINT, nanos INT>}} in the table, step 5 will complete successfully.
> 6. Observe in attached log file, Flink appears to be using the incorrect namespace (should be {{google.protobuf.Timestamp):}}
> {code:java}
> com.example.message.Timestamp message3 = message0.getCreatedAt(); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)