You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2020/12/24 10:59:20 UTC

Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Hi,

I have a UDF which returns a type of MAP<STRING, LEGACY('RAW',
'ANY<io.circe.Json>')>. When I try to register this type with Flink via the
CREATE TABLE DDL, I encounter an exception:

- SQL parse failed. Encountered "(" at line 2, column 256.
Was expecting one of:
    "NOT" ...
    "NULL" ...
    ">" ...
    "MULTISET" ...
    "ARRAY" ...
    "." ...

Which looks like the planner doesn't like the round brackets on the LEGACY
type. What is the correct way to register the table with this type with
Flink?
-- 
Best Regards,
Yuval Itzchakov.

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
I see, alright.

Thank you for all the help!

On Mon, Dec 28, 2020 at 3:57 PM Timo Walther <tw...@apache.org> wrote:

> Hi Yuval,
>
> FLIP-136 will resolve the remaining legacy type issues for DataStream
> API. After that we can drop legacy types because everything can be
> expressed with DataType. DataType is a super type of TypeInformation.
>
> Registering types (e.g. giving a RAW type backed by io.circe.Json a name
> such that it can be used in a DDL statement) is also in the backlog but
> requires another FLIP.
>
> Regards,
> Timo
>
>
> On 28.12.20 13:04, Yuval Itzchakov wrote:
> > Hi Timo,
> > I will look at the Catalog interface and see what it requires.
> >
> > For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
> > resolve the issue around legacy types? Will it's implementation allow to
> > register LEGACY types? or a new variation of them?
> >
> > On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     I would recommend to use the old UDF stack for now. You can simply
> call
> >     `StreamTableEnvironment.registerFunction` instead of
> >     `createTemporarySystemFunction`. Then the UDF returns a legacy type
> >     that
> >     the DataStream API understands.
> >
> >     Have you thought about implementing your own catalog instead of
> >     generating CREATE TABLE statements? The catalog API seems a bit
> complex
> >     at first glance but only requires to implement a couple of methods.
> In
> >     this case you can implement your own `CatalogTable` which is the
> parsed
> >     representation of a `CREATE TABLE` statement. In this case you would
> >     have the full control over the entire type stack end-to-end.
> >
> >     Regards,
> >     Timo
> >
> >     On 28.12.20 10:36, Yuval Itzchakov wrote:
> >      > Timo, an additional question.
> >      >
> >      > I am currently using TypeConversions.fromLegacyInfoToDataType.
> >     However,
> >      > this still does not allow me to register this table with the
> catalog
> >      > since any representation of LEGACY isn't supported (I don't see it
> >      > generating any other DataType other than RAW(.., LEGACY). Is
> >     there any
> >      > way I can preserve the underlying type and still register the
> column
> >      > somehow with CREATE TABLE?
> >      >
> >      > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
> >     <yuvalos@gmail.com <ma...@gmail.com>
> >      > <mailto:yuvalos@gmail.com <ma...@gmail.com>>> wrote:
> >      >
> >      >     Hi Timo,
> >      >
> >      >     Thanks for the explanation. Passing around a byte array is not
> >      >     possible since I need to know the concrete type later for
> >      >     serialization in my sink, so I need to keep the type.
> >      >
> >      >     What I'm trying to achieve is the following:
> >      >     I have a scalar UDF function:
> >      >
> >      >     image.png
> >      >
> >      >     This function is later used in processing of a Flink table, by
> >      >     calling PARSE_JSON on the selected field.
> >      >     Whoever uses these UDFs isn't aware of any Flink syntax for
> >     creating
> >      >     and registering tables, I generate the CREATE TABLE statement
> >     behind
> >      >     the scenes, dynamically.
> >      >     for each table. In order for this to work, I need the UDF to
> >     output
> >      >     the correct type (in this example, io.circe.Json). Later,
> >     this JSON
> >      >     type (or any other type returned by the UDF for that matter)
> will
> >      >     get serialized in a custom sink and into a data warehouse
> (that's
> >      >     why I need to keep the concrete type, since serialization
> >     happens at
> >      >     the edges before writing it out).
> >      >
> >      >     The reason I'm converting between Table and DataStream is
> >     that this
> >      >     table will be further manipulated before being written to the
> >     custom
> >      >     DynamicTableSink by applying transformations on a
> >     DataStream[Row],
> >      >     and then converted back to a Table to be used in an additional
> >      >     CREATE TABLE and then INSERT INTO statement.
> >      >
> >      >     This is why I have these conversions back and forth, and why I
> >      >     somehow need a way to register the legacy types as a valid
> >     type of
> >      >     the table.
> >      >     Hope that clarifies a bit, since the pipeline is rather
> complex I
> >      >     can't really share a MVCE of it.
> >      >
> >      >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
> >     <twalthr@apache.org <ma...@apache.org>
> >      >     <mailto:twalthr@apache.org <ma...@apache.org>>>
> wrote:
> >      >
> >      >         Hi Yuval,
> >      >
> >      >         the legacy type has no string representation that can be
> >     used in
> >      >         a SQL
> >      >         DDL statement. The current string representation
> >     LEGACY(...) is
> >      >         only a
> >      >         temporary work around to persist the old types in
> catalogs.
> >      >
> >      >         Until FLIP-136 is fully implemented,
> >     toAppendStream/toRetractStream
> >      >         support only legacy type info. So I would recommend to
> >     use the
> >      >         legacy
> >      >         type in the UDF return type as well. Either you use the
> old
> >      >         `getResultType` method or you override `getTypeInference`
> >     and call
> >      >         `TypeConversions.fromLegacyInfoToDataType`.
> >      >
> >      >         Another work around could be that you simply use `BYTES`
> >     as the
> >      >         return
> >      >         type and pass around a byte array instead.
> >      >
> >      >         Maybe you can show us a little end-to-end example, what
> >     you are
> >      >         trying
> >      >         to achieve?
> >      >
> >      >         Regards,
> >      >         Timo
> >      >
> >      >
> >      >         On 28.12.20 07:47, Yuval Itzchakov wrote:
> >      >          > Hi Danny,
> >      >          >
> >      >          > Yes, I tried implementing the DataTypeFactory for the
> >     UDF using
> >      >          > TypeInformationRawType (which is deprecated BTW, and
> >     there's
> >      >         no support
> >      >          > for RawType in the conversion), didn't help.
> >      >          >
> >      >          > I did manage to get the conversion working using
> >      >          > TableEnvironment.toAppendStream (I was previously
> directly
> >      >         calling
> >      >          > TypeConversions) but still remains the problem that
> Flink
> >      >         can't register
> >      >          > LEGACY types via the CREATE TABLE DDL
> >      >          >
> >      >          > On Mon, Dec 28, 2020, 04:25 Danny Chan
> >     <danny0405@apache.org <ma...@apache.org>
> >      >         <mailto:danny0405@apache.org <mailto:danny0405@apache.org
> >>
> >      >          > <mailto:danny0405@apache.org
> >     <ma...@apache.org> <mailto:danny0405@apache.org
> >     <ma...@apache.org>>>>
> >      >         wrote:
> >      >          >
> >      >          >      > SQL parse failed. Encount
> >      >          >     What syntax did you use ?
> >      >          >
> >      >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
> >      >         convert a plain
> >      >          >     RAW type back to TypeInformation.
> >      >          >
> >      >          >     Did you try to construct type information by a new
> >      >          >     fresh TypeInformationRawType ?
> >      >          >
> >      >          >     Yuval Itzchakov <yuvalos@gmail.com
> >     <ma...@gmail.com>
> >      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>>
> >     <mailto:yuvalos@gmail.com <ma...@gmail.com>
> >      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>>>> 于
> >      >          >     2020年12月24日周四 下午7:24写道:
> >      >          >
> >      >          >         An expansion to my question:
> >      >          >
> >      >          >         What I really want is for the UDF to return
> >      >         `RAW(io.circe.Json,
> >      >          >         ?)` type, but I have to do a conversion
> >     between Table and
> >      >          >         DataStream, and
> >      >         TypeConversions.fromDataTypeToLegacyInfo cannot
> >      >          >         convert a plain RAW type back to
> TypeInformation.
> >      >          >
> >      >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval
> Itzchakov
> >      >          >         <yuvalos@gmail.com <ma...@gmail.com>
> >     <mailto:yuvalos@gmail.com <ma...@gmail.com>>
> >      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>
> >     <mailto:yuvalos@gmail.com <ma...@gmail.com>>>> wrote:
> >      >          >
> >      >          >             Hi,
> >      >          >
> >      >          >             I have a UDF which returns a type
> >     of MAP<STRING,
> >      >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When
> >     I try
> >      >         to register
> >      >          >             this type with Flink via the CREATE TABLE
> >     DDL, I
> >      >         encounter
> >      >          >             an exception:
> >      >          >
> >      >          >             - SQL parse failed. Encountered "(" at
> line 2,
> >      >         column 256.
> >      >          >             Was expecting one of:
> >      >          >                  "NOT" ...
> >      >          >                  "NULL" ...
> >      >          >                  ">" ...
> >      >          >                  "MULTISET" ...
> >      >          >                  "ARRAY" ...
> >      >          >                  "." ...
> >      >          >
> >      >          >             Which looks like the planner doesn't like
> the
> >      >         round brackets
> >      >          >             on the LEGACY type. What is the correct
> way to
> >      >         register the
> >      >          >             table with this type with Flink?
> >      >          >             --
> >      >          >             Best Regards,
> >      >          >             Yuval Itzchakov.
> >      >          >
> >      >          >
> >      >          >
> >      >          >         --
> >      >          >         Best Regards,
> >      >          >         Yuval Itzchakov.
> >      >          >
> >      >
> >      >
> >      >
> >      >     --
> >      >     Best Regards,
> >      >     Yuval Itzchakov.
> >      >
> >      >
> >      >
> >      > --
> >      > Best Regards,
> >      > Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Timo Walther <tw...@apache.org>.
Hi Yuval,

FLIP-136 will resolve the remaining legacy type issues for DataStream 
API. After that we can drop legacy types because everything can be 
expressed with DataType. DataType is a super type of TypeInformation.

Registering types (e.g. giving a RAW type backed by io.circe.Json a name 
such that it can be used in a DDL statement) is also in the backlog but 
requires another FLIP.

Regards,
Timo


On 28.12.20 13:04, Yuval Itzchakov wrote:
> Hi Timo,
> I will look at the Catalog interface and see what it requires.
> 
> For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to 
> resolve the issue around legacy types? Will it's implementation allow to 
> register LEGACY types? or a new variation of them?
> 
> On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     I would recommend to use the old UDF stack for now. You can simply call
>     `StreamTableEnvironment.registerFunction` instead of
>     `createTemporarySystemFunction`. Then the UDF returns a legacy type
>     that
>     the DataStream API understands.
> 
>     Have you thought about implementing your own catalog instead of
>     generating CREATE TABLE statements? The catalog API seems a bit complex
>     at first glance but only requires to implement a couple of methods. In
>     this case you can implement your own `CatalogTable` which is the parsed
>     representation of a `CREATE TABLE` statement. In this case you would
>     have the full control over the entire type stack end-to-end.
> 
>     Regards,
>     Timo
> 
>     On 28.12.20 10:36, Yuval Itzchakov wrote:
>      > Timo, an additional question.
>      >
>      > I am currently using TypeConversions.fromLegacyInfoToDataType.
>     However,
>      > this still does not allow me to register this table with the catalog
>      > since any representation of LEGACY isn't supported (I don't see it
>      > generating any other DataType other than RAW(.., LEGACY). Is
>     there any
>      > way I can preserve the underlying type and still register the column
>      > somehow with CREATE TABLE?
>      >
>      > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
>     <yuvalos@gmail.com <ma...@gmail.com>
>      > <mailto:yuvalos@gmail.com <ma...@gmail.com>>> wrote:
>      >
>      >     Hi Timo,
>      >
>      >     Thanks for the explanation. Passing around a byte array is not
>      >     possible since I need to know the concrete type later for
>      >     serialization in my sink, so I need to keep the type.
>      >
>      >     What I'm trying to achieve is the following:
>      >     I have a scalar UDF function:
>      >
>      >     image.png
>      >
>      >     This function is later used in processing of a Flink table, by
>      >     calling PARSE_JSON on the selected field.
>      >     Whoever uses these UDFs isn't aware of any Flink syntax for
>     creating
>      >     and registering tables, I generate the CREATE TABLE statement
>     behind
>      >     the scenes, dynamically.
>      >     for each table. In order for this to work, I need the UDF to
>     output
>      >     the correct type (in this example, io.circe.Json). Later,
>     this JSON
>      >     type (or any other type returned by the UDF for that matter) will
>      >     get serialized in a custom sink and into a data warehouse (that's
>      >     why I need to keep the concrete type, since serialization
>     happens at
>      >     the edges before writing it out).
>      >
>      >     The reason I'm converting between Table and DataStream is
>     that this
>      >     table will be further manipulated before being written to the
>     custom
>      >     DynamicTableSink by applying transformations on a
>     DataStream[Row],
>      >     and then converted back to a Table to be used in an additional
>      >     CREATE TABLE and then INSERT INTO statement.
>      >
>      >     This is why I have these conversions back and forth, and why I
>      >     somehow need a way to register the legacy types as a valid
>     type of
>      >     the table.
>      >     Hope that clarifies a bit, since the pipeline is rather complex I
>      >     can't really share a MVCE of it.
>      >
>      >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
>     <twalthr@apache.org <ma...@apache.org>
>      >     <mailto:twalthr@apache.org <ma...@apache.org>>> wrote:
>      >
>      >         Hi Yuval,
>      >
>      >         the legacy type has no string representation that can be
>     used in
>      >         a SQL
>      >         DDL statement. The current string representation
>     LEGACY(...) is
>      >         only a
>      >         temporary work around to persist the old types in catalogs.
>      >
>      >         Until FLIP-136 is fully implemented,
>     toAppendStream/toRetractStream
>      >         support only legacy type info. So I would recommend to
>     use the
>      >         legacy
>      >         type in the UDF return type as well. Either you use the old
>      >         `getResultType` method or you override `getTypeInference`
>     and call
>      >         `TypeConversions.fromLegacyInfoToDataType`.
>      >
>      >         Another work around could be that you simply use `BYTES`
>     as the
>      >         return
>      >         type and pass around a byte array instead.
>      >
>      >         Maybe you can show us a little end-to-end example, what
>     you are
>      >         trying
>      >         to achieve?
>      >
>      >         Regards,
>      >         Timo
>      >
>      >
>      >         On 28.12.20 07:47, Yuval Itzchakov wrote:
>      >          > Hi Danny,
>      >          >
>      >          > Yes, I tried implementing the DataTypeFactory for the
>     UDF using
>      >          > TypeInformationRawType (which is deprecated BTW, and
>     there's
>      >         no support
>      >          > for RawType in the conversion), didn't help.
>      >          >
>      >          > I did manage to get the conversion working using
>      >          > TableEnvironment.toAppendStream (I was previously directly
>      >         calling
>      >          > TypeConversions) but still remains the problem that Flink
>      >         can't register
>      >          > LEGACY types via the CREATE TABLE DDL
>      >          >
>      >          > On Mon, Dec 28, 2020, 04:25 Danny Chan
>     <danny0405@apache.org <ma...@apache.org>
>      >         <mailto:danny0405@apache.org <ma...@apache.org>>
>      >          > <mailto:danny0405@apache.org
>     <ma...@apache.org> <mailto:danny0405@apache.org
>     <ma...@apache.org>>>>
>      >         wrote:
>      >          >
>      >          >      > SQL parse failed. Encount
>      >          >     What syntax did you use ?
>      >          >
>      >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
>      >         convert a plain
>      >          >     RAW type back to TypeInformation.
>      >          >
>      >          >     Did you try to construct type information by a new
>      >          >     fresh TypeInformationRawType ?
>      >          >
>      >          >     Yuval Itzchakov <yuvalos@gmail.com
>     <ma...@gmail.com>
>      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>>
>     <mailto:yuvalos@gmail.com <ma...@gmail.com>
>      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>>>> 于
>      >          >     2020年12月24日周四 下午7:24写道:
>      >          >
>      >          >         An expansion to my question:
>      >          >
>      >          >         What I really want is for the UDF to return
>      >         `RAW(io.circe.Json,
>      >          >         ?)` type, but I have to do a conversion
>     between Table and
>      >          >         DataStream, and
>      >         TypeConversions.fromDataTypeToLegacyInfo cannot
>      >          >         convert a plain RAW type back to TypeInformation.
>      >          >
>      >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>      >          >         <yuvalos@gmail.com <ma...@gmail.com>
>     <mailto:yuvalos@gmail.com <ma...@gmail.com>>
>      >         <mailto:yuvalos@gmail.com <ma...@gmail.com>
>     <mailto:yuvalos@gmail.com <ma...@gmail.com>>>> wrote:
>      >          >
>      >          >             Hi,
>      >          >
>      >          >             I have a UDF which returns a type
>     of MAP<STRING,
>      >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When
>     I try
>      >         to register
>      >          >             this type with Flink via the CREATE TABLE
>     DDL, I
>      >         encounter
>      >          >             an exception:
>      >          >
>      >          >             - SQL parse failed. Encountered "(" at line 2,
>      >         column 256.
>      >          >             Was expecting one of:
>      >          >                  "NOT" ...
>      >          >                  "NULL" ...
>      >          >                  ">" ...
>      >          >                  "MULTISET" ...
>      >          >                  "ARRAY" ...
>      >          >                  "." ...
>      >          >
>      >          >             Which looks like the planner doesn't like the
>      >         round brackets
>      >          >             on the LEGACY type. What is the correct way to
>      >         register the
>      >          >             table with this type with Flink?
>      >          >             --
>      >          >             Best Regards,
>      >          >             Yuval Itzchakov.
>      >          >
>      >          >
>      >          >
>      >          >         --
>      >          >         Best Regards,
>      >          >         Yuval Itzchakov.
>      >          >
>      >
>      >
>      >
>      >     --
>      >     Best Regards,
>      >     Yuval Itzchakov.
>      >
>      >
>      >
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.


Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Timo,
I will look at the Catalog interface and see what it requires.

For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
resolve the issue around legacy types? Will it's implementation allow to
register LEGACY types? or a new variation of them?

On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <tw...@apache.org> wrote:

> I would recommend to use the old UDF stack for now. You can simply call
> `StreamTableEnvironment.registerFunction` instead of
> `createTemporarySystemFunction`. Then the UDF returns a legacy type that
> the DataStream API understands.
>
> Have you thought about implementing your own catalog instead of
> generating CREATE TABLE statements? The catalog API seems a bit complex
> at first glance but only requires to implement a couple of methods. In
> this case you can implement your own `CatalogTable` which is the parsed
> representation of a `CREATE TABLE` statement. In this case you would
> have the full control over the entire type stack end-to-end.
>
> Regards,
> Timo
>
> On 28.12.20 10:36, Yuval Itzchakov wrote:
> > Timo, an additional question.
> >
> > I am currently using TypeConversions.fromLegacyInfoToDataType. However,
> > this still does not allow me to register this table with the catalog
> > since any representation of LEGACY isn't supported (I don't see it
> > generating any other DataType other than RAW(.., LEGACY). Is there any
> > way I can preserve the underlying type and still register the column
> > somehow with CREATE TABLE?
> >
> > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <yuvalos@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hi Timo,
> >
> >     Thanks for the explanation. Passing around a byte array is not
> >     possible since I need to know the concrete type later for
> >     serialization in my sink, so I need to keep the type.
> >
> >     What I'm trying to achieve is the following:
> >     I have a scalar UDF function:
> >
> >     image.png
> >
> >     This function is later used in processing of a Flink table, by
> >     calling PARSE_JSON on the selected field.
> >     Whoever uses these UDFs isn't aware of any Flink syntax for creating
> >     and registering tables, I generate the CREATE TABLE statement behind
> >     the scenes, dynamically.
> >     for each table. In order for this to work, I need the UDF to output
> >     the correct type (in this example, io.circe.Json). Later, this JSON
> >     type (or any other type returned by the UDF for that matter) will
> >     get serialized in a custom sink and into a data warehouse (that's
> >     why I need to keep the concrete type, since serialization happens at
> >     the edges before writing it out).
> >
> >     The reason I'm converting between Table and DataStream is that this
> >     table will be further manipulated before being written to the custom
> >     DynamicTableSink by applying transformations on a DataStream[Row],
> >     and then converted back to a Table to be used in an additional
> >     CREATE TABLE and then INSERT INTO statement.
> >
> >     This is why I have these conversions back and forth, and why I
> >     somehow need a way to register the legacy types as a valid type of
> >     the table.
> >     Hope that clarifies a bit, since the pipeline is rather complex I
> >     can't really share a MVCE of it.
> >
> >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <twalthr@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         Hi Yuval,
> >
> >         the legacy type has no string representation that can be used in
> >         a SQL
> >         DDL statement. The current string representation LEGACY(...) is
> >         only a
> >         temporary work around to persist the old types in catalogs.
> >
> >         Until FLIP-136 is fully implemented,
> toAppendStream/toRetractStream
> >         support only legacy type info. So I would recommend to use the
> >         legacy
> >         type in the UDF return type as well. Either you use the old
> >         `getResultType` method or you override `getTypeInference` and
> call
> >         `TypeConversions.fromLegacyInfoToDataType`.
> >
> >         Another work around could be that you simply use `BYTES` as the
> >         return
> >         type and pass around a byte array instead.
> >
> >         Maybe you can show us a little end-to-end example, what you are
> >         trying
> >         to achieve?
> >
> >         Regards,
> >         Timo
> >
> >
> >         On 28.12.20 07:47, Yuval Itzchakov wrote:
> >          > Hi Danny,
> >          >
> >          > Yes, I tried implementing the DataTypeFactory for the UDF
> using
> >          > TypeInformationRawType (which is deprecated BTW, and there's
> >         no support
> >          > for RawType in the conversion), didn't help.
> >          >
> >          > I did manage to get the conversion working using
> >          > TableEnvironment.toAppendStream (I was previously directly
> >         calling
> >          > TypeConversions) but still remains the problem that Flink
> >         can't register
> >          > LEGACY types via the CREATE TABLE DDL
> >          >
> >          > On Mon, Dec 28, 2020, 04:25 Danny Chan <danny0405@apache.org
> >         <ma...@apache.org>
> >          > <mailto:danny0405@apache.org <ma...@apache.org>>>
> >         wrote:
> >          >
> >          >      > SQL parse failed. Encount
> >          >     What syntax did you use ?
> >          >
> >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
> >         convert a plain
> >          >     RAW type back to TypeInformation.
> >          >
> >          >     Did you try to construct type information by a new
> >          >     fresh TypeInformationRawType ?
> >          >
> >          >     Yuval Itzchakov <yuvalos@gmail.com
> >         <ma...@gmail.com> <mailto:yuvalos@gmail.com
> >         <ma...@gmail.com>>> 于
> >          >     2020年12月24日周四 下午7:24写道:
> >          >
> >          >         An expansion to my question:
> >          >
> >          >         What I really want is for the UDF to return
> >         `RAW(io.circe.Json,
> >          >         ?)` type, but I have to do a conversion between Table
> and
> >          >         DataStream, and
> >         TypeConversions.fromDataTypeToLegacyInfo cannot
> >          >         convert a plain RAW type back to TypeInformation.
> >          >
> >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> >          >         <yuvalos@gmail.com <ma...@gmail.com>
> >         <mailto:yuvalos@gmail.com <ma...@gmail.com>>> wrote:
> >          >
> >          >             Hi,
> >          >
> >          >             I have a UDF which returns a type of MAP<STRING,
> >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
> >         to register
> >          >             this type with Flink via the CREATE TABLE DDL, I
> >         encounter
> >          >             an exception:
> >          >
> >          >             - SQL parse failed. Encountered "(" at line 2,
> >         column 256.
> >          >             Was expecting one of:
> >          >                  "NOT" ...
> >          >                  "NULL" ...
> >          >                  ">" ...
> >          >                  "MULTISET" ...
> >          >                  "ARRAY" ...
> >          >                  "." ...
> >          >
> >          >             Which looks like the planner doesn't like the
> >         round brackets
> >          >             on the LEGACY type. What is the correct way to
> >         register the
> >          >             table with this type with Flink?
> >          >             --
> >          >             Best Regards,
> >          >             Yuval Itzchakov.
> >          >
> >          >
> >          >
> >          >         --
> >          >         Best Regards,
> >          >         Yuval Itzchakov.
> >          >
> >
> >
> >
> >     --
> >     Best Regards,
> >     Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Timo Walther <tw...@apache.org>.
I would recommend to use the old UDF stack for now. You can simply call 
`StreamTableEnvironment.registerFunction` instead of 
`createTemporarySystemFunction`. Then the UDF returns a legacy type that 
the DataStream API understands.

Have you thought about implementing your own catalog instead of 
generating CREATE TABLE statements? The catalog API seems a bit complex 
at first glance but only requires to implement a couple of methods. In 
this case you can implement your own `CatalogTable` which is the parsed 
representation of a `CREATE TABLE` statement. In this case you would 
have the full control over the entire type stack end-to-end.

Regards,
Timo

On 28.12.20 10:36, Yuval Itzchakov wrote:
> Timo, an additional question.
> 
> I am currently using TypeConversions.fromLegacyInfoToDataType. However, 
> this still does not allow me to register this table with the catalog 
> since any representation of LEGACY isn't supported (I don't see it 
> generating any other DataType other than RAW(.., LEGACY). Is there any 
> way I can preserve the underlying type and still register the column 
> somehow with CREATE TABLE?
> 
> On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <yuvalos@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi Timo,
> 
>     Thanks for the explanation. Passing around a byte array is not
>     possible since I need to know the concrete type later for
>     serialization in my sink, so I need to keep the type.
> 
>     What I'm trying to achieve is the following:
>     I have a scalar UDF function:
> 
>     image.png
> 
>     This function is later used in processing of a Flink table, by
>     calling PARSE_JSON on the selected field.
>     Whoever uses these UDFs isn't aware of any Flink syntax for creating
>     and registering tables, I generate the CREATE TABLE statement behind
>     the scenes, dynamically.
>     for each table. In order for this to work, I need the UDF to output
>     the correct type (in this example, io.circe.Json). Later, this JSON
>     type (or any other type returned by the UDF for that matter) will
>     get serialized in a custom sink and into a data warehouse (that's
>     why I need to keep the concrete type, since serialization happens at
>     the edges before writing it out).
> 
>     The reason I'm converting between Table and DataStream is that this
>     table will be further manipulated before being written to the custom
>     DynamicTableSink by applying transformations on a DataStream[Row],
>     and then converted back to a Table to be used in an additional
>     CREATE TABLE and then INSERT INTO statement.
> 
>     This is why I have these conversions back and forth, and why I
>     somehow need a way to register the legacy types as a valid type of
>     the table.
>     Hope that clarifies a bit, since the pipeline is rather complex I
>     can't really share a MVCE of it.
> 
>     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <twalthr@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Hi Yuval,
> 
>         the legacy type has no string representation that can be used in
>         a SQL
>         DDL statement. The current string representation LEGACY(...) is
>         only a
>         temporary work around to persist the old types in catalogs.
> 
>         Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
>         support only legacy type info. So I would recommend to use the
>         legacy
>         type in the UDF return type as well. Either you use the old
>         `getResultType` method or you override `getTypeInference` and call
>         `TypeConversions.fromLegacyInfoToDataType`.
> 
>         Another work around could be that you simply use `BYTES` as the
>         return
>         type and pass around a byte array instead.
> 
>         Maybe you can show us a little end-to-end example, what you are
>         trying
>         to achieve?
> 
>         Regards,
>         Timo
> 
> 
>         On 28.12.20 07:47, Yuval Itzchakov wrote:
>          > Hi Danny,
>          >
>          > Yes, I tried implementing the DataTypeFactory for the UDF using
>          > TypeInformationRawType (which is deprecated BTW, and there's
>         no support
>          > for RawType in the conversion), didn't help.
>          >
>          > I did manage to get the conversion working using
>          > TableEnvironment.toAppendStream (I was previously directly
>         calling
>          > TypeConversions) but still remains the problem that Flink
>         can't register
>          > LEGACY types via the CREATE TABLE DDL
>          >
>          > On Mon, Dec 28, 2020, 04:25 Danny Chan <danny0405@apache.org
>         <ma...@apache.org>
>          > <mailto:danny0405@apache.org <ma...@apache.org>>>
>         wrote:
>          >
>          >      > SQL parse failed. Encount
>          >     What syntax did you use ?
>          >
>          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
>         convert a plain
>          >     RAW type back to TypeInformation.
>          >
>          >     Did you try to construct type information by a new
>          >     fresh TypeInformationRawType ?
>          >
>          >     Yuval Itzchakov <yuvalos@gmail.com
>         <ma...@gmail.com> <mailto:yuvalos@gmail.com
>         <ma...@gmail.com>>> 于
>          >     2020年12月24日周四 下午7:24写道:
>          >
>          >         An expansion to my question:
>          >
>          >         What I really want is for the UDF to return
>         `RAW(io.circe.Json,
>          >         ?)` type, but I have to do a conversion between Table and
>          >         DataStream, and
>         TypeConversions.fromDataTypeToLegacyInfo cannot
>          >         convert a plain RAW type back to TypeInformation.
>          >
>          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>          >         <yuvalos@gmail.com <ma...@gmail.com>
>         <mailto:yuvalos@gmail.com <ma...@gmail.com>>> wrote:
>          >
>          >             Hi,
>          >
>          >             I have a UDF which returns a type of MAP<STRING,
>          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
>         to register
>          >             this type with Flink via the CREATE TABLE DDL, I
>         encounter
>          >             an exception:
>          >
>          >             - SQL parse failed. Encountered "(" at line 2,
>         column 256.
>          >             Was expecting one of:
>          >                  "NOT" ...
>          >                  "NULL" ...
>          >                  ">" ...
>          >                  "MULTISET" ...
>          >                  "ARRAY" ...
>          >                  "." ...
>          >
>          >             Which looks like the planner doesn't like the
>         round brackets
>          >             on the LEGACY type. What is the correct way to
>         register the
>          >             table with this type with Flink?
>          >             --
>          >             Best Regards,
>          >             Yuval Itzchakov.
>          >
>          >
>          >
>          >         --
>          >         Best Regards,
>          >         Yuval Itzchakov.
>          >
> 
> 
> 
>     -- 
>     Best Regards,
>     Yuval Itzchakov.
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.


Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
Timo, an additional question.

I am currently using TypeConversions.fromLegacyInfoToDataType. However,
this still does not allow me to register this table with the catalog since
any representation of LEGACY isn't supported (I don't see it generating any
other DataType other than RAW(.., LEGACY). Is there any way I can preserve
the underlying type and still register the column somehow with CREATE TABLE?

On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi Timo,
>
> Thanks for the explanation. Passing around a byte array is not possible
> since I need to know the concrete type later for serialization in my sink,
> so I need to keep the type.
>
> What I'm trying to achieve is the following:
> I have a scalar UDF function:
>
> [image: image.png]
>
> This function is later used in processing of a Flink table, by calling
> PARSE_JSON on the selected field.
> Whoever uses these UDFs isn't aware of any Flink syntax for creating and
> registering tables, I generate the CREATE TABLE statement behind the
> scenes, dynamically.
> for each table. In order for this to work, I need the UDF to output the
> correct type (in this example, io.circe.Json). Later, this JSON type (or
> any other type returned by the UDF for that matter) will get serialized in
> a custom sink and into a data warehouse (that's why I need to keep the
> concrete type, since serialization happens at the edges before writing it
> out).
>
> The reason I'm converting between Table and DataStream is that this table
> will be further manipulated before being written to the custom
> DynamicTableSink by applying transformations on a DataStream[Row], and then
> converted back to a Table to be used in an additional CREATE TABLE and then
> INSERT INTO statement.
>
> This is why I have these conversions back and forth, and why I somehow
> need a way to register the legacy types as a valid type of the table.
> Hope that clarifies a bit, since the pipeline is rather complex I can't
> really share a MVCE of it.
>
> On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Yuval,
>>
>> the legacy type has no string representation that can be used in a SQL
>> DDL statement. The current string representation LEGACY(...) is only a
>> temporary work around to persist the old types in catalogs.
>>
>> Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
>> support only legacy type info. So I would recommend to use the legacy
>> type in the UDF return type as well. Either you use the old
>> `getResultType` method or you override `getTypeInference` and call
>> `TypeConversions.fromLegacyInfoToDataType`.
>>
>> Another work around could be that you simply use `BYTES` as the return
>> type and pass around a byte array instead.
>>
>> Maybe you can show us a little end-to-end example, what you are trying
>> to achieve?
>>
>> Regards,
>> Timo
>>
>>
>> On 28.12.20 07:47, Yuval Itzchakov wrote:
>> > Hi Danny,
>> >
>> > Yes, I tried implementing the DataTypeFactory for the UDF using
>> > TypeInformationRawType (which is deprecated BTW, and there's no support
>> > for RawType in the conversion), didn't help.
>> >
>> > I did manage to get the conversion working using
>> > TableEnvironment.toAppendStream (I was previously directly calling
>> > TypeConversions) but still remains the problem that Flink can't
>> register
>> > LEGACY types via the CREATE TABLE DDL
>> >
>> > On Mon, Dec 28, 2020, 04:25 Danny Chan <danny0405@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >      > SQL parse failed. Encount
>> >     What syntax did you use ?
>> >
>> >      > TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain
>> >     RAW type back to TypeInformation.
>> >
>> >     Did you try to construct type information by a new
>> >     fresh TypeInformationRawType ?
>> >
>> >     Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> 于
>> >     2020年12月24日周四 下午7:24写道:
>> >
>> >         An expansion to my question:
>> >
>> >         What I really want is for the UDF to return `RAW(io.circe.Json,
>> >         ?)` type, but I have to do a conversion between Table and
>> >         DataStream, and TypeConversions.fromDataTypeToLegacyInfo cannot
>> >         convert a plain RAW type back to TypeInformation.
>> >
>> >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>> >         <yuvalos@gmail.com <ma...@gmail.com>> wrote:
>> >
>> >             Hi,
>> >
>> >             I have a UDF which returns a type of MAP<STRING,
>> >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try to register
>> >             this type with Flink via the CREATE TABLE DDL, I encounter
>> >             an exception:
>> >
>> >             - SQL parse failed. Encountered "(" at line 2, column 256.
>> >             Was expecting one of:
>> >                  "NOT" ...
>> >                  "NULL" ...
>> >                  ">" ...
>> >                  "MULTISET" ...
>> >                  "ARRAY" ...
>> >                  "." ...
>> >
>> >             Which looks like the planner doesn't like the round brackets
>> >             on the LEGACY type. What is the correct way to register the
>> >             table with this type with Flink?
>> >             --
>> >             Best Regards,
>> >             Yuval Itzchakov.
>> >
>> >
>> >
>> >         --
>> >         Best Regards,
>> >         Yuval Itzchakov.
>> >
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Timo,

Thanks for the explanation. Passing around a byte array is not possible
since I need to know the concrete type later for serialization in my sink,
so I need to keep the type.

What I'm trying to achieve is the following:
I have a scalar UDF function:

[image: image.png]

This function is later used in processing of a Flink table, by calling
PARSE_JSON on the selected field.
Whoever uses these UDFs isn't aware of any Flink syntax for creating and
registering tables, I generate the CREATE TABLE statement behind the
scenes, dynamically.
for each table. In order for this to work, I need the UDF to output the
correct type (in this example, io.circe.Json). Later, this JSON type (or
any other type returned by the UDF for that matter) will get serialized in
a custom sink and into a data warehouse (that's why I need to keep the
concrete type, since serialization happens at the edges before writing it
out).

The reason I'm converting between Table and DataStream is that this table
will be further manipulated before being written to the custom
DynamicTableSink by applying transformations on a DataStream[Row], and then
converted back to a Table to be used in an additional CREATE TABLE and then
INSERT INTO statement.

This is why I have these conversions back and forth, and why I somehow need
a way to register the legacy types as a valid type of the table.
Hope that clarifies a bit, since the pipeline is rather complex I can't
really share a MVCE of it.

On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <tw...@apache.org> wrote:

> Hi Yuval,
>
> the legacy type has no string representation that can be used in a SQL
> DDL statement. The current string representation LEGACY(...) is only a
> temporary work around to persist the old types in catalogs.
>
> Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
> support only legacy type info. So I would recommend to use the legacy
> type in the UDF return type as well. Either you use the old
> `getResultType` method or you override `getTypeInference` and call
> `TypeConversions.fromLegacyInfoToDataType`.
>
> Another work around could be that you simply use `BYTES` as the return
> type and pass around a byte array instead.
>
> Maybe you can show us a little end-to-end example, what you are trying
> to achieve?
>
> Regards,
> Timo
>
>
> On 28.12.20 07:47, Yuval Itzchakov wrote:
> > Hi Danny,
> >
> > Yes, I tried implementing the DataTypeFactory for the UDF using
> > TypeInformationRawType (which is deprecated BTW, and there's no support
> > for RawType in the conversion), didn't help.
> >
> > I did manage to get the conversion working using
> > TableEnvironment.toAppendStream (I was previously directly calling
> > TypeConversions) but still remains the problem that Flink can't register
> > LEGACY types via the CREATE TABLE DDL
> >
> > On Mon, Dec 28, 2020, 04:25 Danny Chan <danny0405@apache.org
> > <ma...@apache.org>> wrote:
> >
> >      > SQL parse failed. Encount
> >     What syntax did you use ?
> >
> >      > TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain
> >     RAW type back to TypeInformation.
> >
> >     Did you try to construct type information by a new
> >     fresh TypeInformationRawType ?
> >
> >     Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> 于
> >     2020年12月24日周四 下午7:24写道:
> >
> >         An expansion to my question:
> >
> >         What I really want is for the UDF to return `RAW(io.circe.Json,
> >         ?)` type, but I have to do a conversion between Table and
> >         DataStream, and TypeConversions.fromDataTypeToLegacyInfo cannot
> >         convert a plain RAW type back to TypeInformation.
> >
> >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> >         <yuvalos@gmail.com <ma...@gmail.com>> wrote:
> >
> >             Hi,
> >
> >             I have a UDF which returns a type of MAP<STRING,
> >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try to register
> >             this type with Flink via the CREATE TABLE DDL, I encounter
> >             an exception:
> >
> >             - SQL parse failed. Encountered "(" at line 2, column 256.
> >             Was expecting one of:
> >                  "NOT" ...
> >                  "NULL" ...
> >                  ">" ...
> >                  "MULTISET" ...
> >                  "ARRAY" ...
> >                  "." ...
> >
> >             Which looks like the planner doesn't like the round brackets
> >             on the LEGACY type. What is the correct way to register the
> >             table with this type with Flink?
> >             --
> >             Best Regards,
> >             Yuval Itzchakov.
> >
> >
> >
> >         --
> >         Best Regards,
> >         Yuval Itzchakov.
> >
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Timo Walther <tw...@apache.org>.
Hi Yuval,

the legacy type has no string representation that can be used in a SQL 
DDL statement. The current string representation LEGACY(...) is only a 
temporary work around to persist the old types in catalogs.

Until FLIP-136 is fully implemented, toAppendStream/toRetractStream 
support only legacy type info. So I would recommend to use the legacy 
type in the UDF return type as well. Either you use the old 
`getResultType` method or you override `getTypeInference` and call 
`TypeConversions.fromLegacyInfoToDataType`.

Another work around could be that you simply use `BYTES` as the return 
type and pass around a byte array instead.

Maybe you can show us a little end-to-end example, what you are trying 
to achieve?

Regards,
Timo


On 28.12.20 07:47, Yuval Itzchakov wrote:
> Hi Danny,
> 
> Yes, I tried implementing the DataTypeFactory for the UDF using 
> TypeInformationRawType (which is deprecated BTW, and there's no support 
> for RawType in the conversion), didn't help.
> 
> I did manage to get the conversion working using 
> TableEnvironment.toAppendStream (I was previously directly calling 
> TypeConversions) but still remains the problem that Flink can't register 
> LEGACY types via the CREATE TABLE DDL
> 
> On Mon, Dec 28, 2020, 04:25 Danny Chan <danny0405@apache.org 
> <ma...@apache.org>> wrote:
> 
>      > SQL parse failed. Encount
>     What syntax did you use ?
> 
>      > TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain
>     RAW type back to TypeInformation.
> 
>     Did you try to construct type information by a new
>     fresh TypeInformationRawType ?
> 
>     Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> 于
>     2020年12月24日周四 下午7:24写道:
> 
>         An expansion to my question:
> 
>         What I really want is for the UDF to return `RAW(io.circe.Json,
>         ?)` type, but I have to do a conversion between Table and
>         DataStream, and TypeConversions.fromDataTypeToLegacyInfo cannot
>         convert a plain RAW type back to TypeInformation.
> 
>         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>         <yuvalos@gmail.com <ma...@gmail.com>> wrote:
> 
>             Hi,
> 
>             I have a UDF which returns a type of MAP<STRING,
>             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try to register
>             this type with Flink via the CREATE TABLE DDL, I encounter
>             an exception:
> 
>             - SQL parse failed. Encountered "(" at line 2, column 256.
>             Was expecting one of:
>                  "NOT" ...
>                  "NULL" ...
>                  ">" ...
>                  "MULTISET" ...
>                  "ARRAY" ...
>                  "." ...
> 
>             Which looks like the planner doesn't like the round brackets
>             on the LEGACY type. What is the correct way to register the
>             table with this type with Flink?
>             -- 
>             Best Regards,
>             Yuval Itzchakov.
> 
> 
> 
>         -- 
>         Best Regards,
>         Yuval Itzchakov.
> 


Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Danny,

Yes, I tried implementing the DataTypeFactory for the UDF using
TypeInformationRawType (which is deprecated BTW, and there's no support for
RawType in the conversion), didn't help.

I did manage to get the conversion working using
TableEnvironment.toAppendStream (I was previously directly calling
TypeConversions) but still remains the problem that Flink can't register
LEGACY types via the CREATE TABLE DDL

On Mon, Dec 28, 2020, 04:25 Danny Chan <da...@apache.org> wrote:

> > SQL parse failed. Encount
> What syntax did you use ?
>
> > TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
> back to TypeInformation.
>
> Did you try to construct type information by a new
> fresh TypeInformationRawType ?
>
> Yuval Itzchakov <yu...@gmail.com> 于2020年12月24日周四 下午7:24写道:
>
>> An expansion to my question:
>>
>> What I really want is for the UDF to return `RAW(io.circe.Json, ?)` type,
>> but I have to do a conversion between Table and DataStream, and
>> TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
>> back to TypeInformation.
>>
>> On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a UDF which returns a type of MAP<STRING, LEGACY('RAW',
>>> 'ANY<io.circe.Json>')>. When I try to register this type with Flink via the
>>> CREATE TABLE DDL, I encounter an exception:
>>>
>>> - SQL parse failed. Encountered "(" at line 2, column 256.
>>> Was expecting one of:
>>>     "NOT" ...
>>>     "NULL" ...
>>>     ">" ...
>>>     "MULTISET" ...
>>>     "ARRAY" ...
>>>     "." ...
>>>
>>> Which looks like the planner doesn't like the round brackets on the
>>> LEGACY type. What is the correct way to register the table with this type
>>> with Flink?
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Danny Chan <da...@apache.org>.
> SQL parse failed. Encount
What syntax did you use ?

> TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
back to TypeInformation.

Did you try to construct type information by a new
fresh TypeInformationRawType ?

Yuval Itzchakov <yu...@gmail.com> 于2020年12月24日周四 下午7:24写道:

> An expansion to my question:
>
> What I really want is for the UDF to return `RAW(io.circe.Json, ?)` type,
> but I have to do a conversion between Table and DataStream, and
> TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
> back to TypeInformation.
>
> On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov <yu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a UDF which returns a type of MAP<STRING, LEGACY('RAW',
>> 'ANY<io.circe.Json>')>. When I try to register this type with Flink via the
>> CREATE TABLE DDL, I encounter an exception:
>>
>> - SQL parse failed. Encountered "(" at line 2, column 256.
>> Was expecting one of:
>>     "NOT" ...
>>     "NULL" ...
>>     ">" ...
>>     "MULTISET" ...
>>     "ARRAY" ...
>>     "." ...
>>
>> Which looks like the planner doesn't like the round brackets on the
>> LEGACY type. What is the correct way to register the table with this type
>> with Flink?
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov <yu...@gmail.com>.
An expansion to my question:

What I really want is for the UDF to return `RAW(io.circe.Json, ?)` type,
but I have to do a conversion between Table and DataStream, and
TypeConversions.fromDataTypeToLegacyInfo cannot convert a plain RAW type
back to TypeInformation.

On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi,
>
> I have a UDF which returns a type of MAP<STRING, LEGACY('RAW',
> 'ANY<io.circe.Json>')>. When I try to register this type with Flink via the
> CREATE TABLE DDL, I encounter an exception:
>
> - SQL parse failed. Encountered "(" at line 2, column 256.
> Was expecting one of:
>     "NOT" ...
>     "NULL" ...
>     ">" ...
>     "MULTISET" ...
>     "ARRAY" ...
>     "." ...
>
> Which looks like the planner doesn't like the round brackets on the LEGACY
> type. What is the correct way to register the table with this type with
> Flink?
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.