You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zhenghua Gao <do...@gmail.com> on 2020/02/03 07:52:18 UTC

[DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Hi all,

FLINK-12254[1] [2] updated TableSink and related interfaces to new type
system which
allows connectors use the new type system based on DataTypes.

But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
flink-api-java-bridge and returns TypeInformation of the requested record
type which
can't support types with precision and scale, e.g. TIMESTAMP(p),
DECIMAL(p,s).

/**
 * Returns the requested record type.
 */
TypeInformation<T> getRecordType();


A proposal is deprecating the *getRecordType* API and adding a
*getRecordDataType* API instead to return the data type of the requested
record. I have filed the issue FLINK-15469 and
an initial PR to verify it.

What do you think about this API changes? Any feedback are appreciated.
[1] https://issues.apache.org/jira/browse/FLINK-12254
[2] https://github.com/apache/flink/pull/8596
[3] https://issues.apache.org/jira/browse/FLINK-15469

*Best Regards,*
*Zhenghua Gao*

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Zhenghua Gao <do...@gmail.com>.
Should we distinguish *record data type* and *consumed data type*?
Currently the design of UpsertStreamTableSink and RetractStreamTableSink
DO  distinguish them.

In my proposal the framework will ignore *getConsumedDataType*,
so it's ok to use *getConsumedDataType* to do the job if we
don't distinguish *record data type* and *consumed data type*.

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:

> Would overriding `getConsumedDataType` do the job?
>
> Best,
> Kurt
>
>
> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com> wrote:
>
> > Hi all,
> >
> > FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> > system which
> > allows connectors use the new type system based on DataTypes.
> >
> > But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> > flink-api-java-bridge and returns TypeInformation of the requested record
> > type which
> > can't support types with precision and scale, e.g. TIMESTAMP(p),
> > DECIMAL(p,s).
> >
> > /**
> >  * Returns the requested record type.
> >  */
> > TypeInformation<T> getRecordType();
> >
> >
> > A proposal is deprecating the *getRecordType* API and adding a
> > *getRecordDataType* API instead to return the data type of the requested
> > record. I have filed the issue FLINK-15469 and
> > an initial PR to verify it.
> >
> > What do you think about this API changes? Any feedback are appreciated.
> > [1] https://issues.apache.org/jira/browse/FLINK-12254
> > [2] https://github.com/apache/flink/pull/8596
> > [3] https://issues.apache.org/jira/browse/FLINK-15469
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Zhenghua Gao <do...@gmail.com>.
Thanks Timo! Look forward your design!

*Best Regards,*
*Zhenghua Gao*


On Fri, Feb 7, 2020 at 5:26 PM Timo Walther <tw...@apache.org> wrote:

> Hi Zhenghua,
>
> Jark is right. The reason why we haven't updated those interfaces yet is
> because we are actually would like to introduce new interfaces. We
> should target new interfaces in this release. Even a short-term fix as
> you proposed with `getRecordDataType` does actually not help as Jingsong
> pointed out because we cannot represent tuples in DataType and are also
> not planning to support them natively but only as a structured type in
> the future.
>
> In my envisioned design, the new sink interface should just always get a
> `ChangeRow` which is never serialized and just a data structure for
> communicating between the wrapping sink function and the returned sink
> function by the table sink.
>
> Let me sketch a rough design document that I will share with you
> shortly. Then we could also discuss alternatives.
>
> Thanks,
> Timo
>
>
> On 04.02.20 04:18, Zhenghua Gao wrote:
> > Hi Jark, thanks for your comments.
> >>>> IIUC, the framework will only recognize getRecordDataType and
> >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> > Your are right.
> >
> >>>> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >>>> three getXXXType().
> > the getRecordType and getOutputType is deprecated and mainly for backward
> > compatibility.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <im...@gmail.com> wrote:
> >
> >> Thanks Zhenghua for starting this discussion.
> >>
> >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> >> system which affects usability a lot.
> >> I hope we can fix that in 1.11.
> >>
> >> I'm find with *getRecordDataType* for a temporary solution.
> >> IIUC, the framework will only recognize getRecordDataType and
> >> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> >>
> >> I guess Timo are planning to design a new source/sink interface which
> will
> >> also fix this problem, but I'm not sure the timelines. cc @Timo
> >> It would be better if we can have a new and complete interface, because
> >> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >> three getXXXType().
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <do...@gmail.com> wrote:
> >>
> >>> Hi Jingsong,  For now, only UpsertStreamTableSink and
> >>> RetractStreamTableSink consumes JTuple2
> >>> So the 'getConsumedDataType' interface is not necessary in validate &
> >>> codegen phase.
> >>> See
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >>>   and
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >>>
> >>> What about stay the same to use RAW type?
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Zhenghua,
> >>>>
> >>>> The *getRecordDataType* looks good to me.
> >>>>
> >>>> But the main problem is how to represent the tuple type in DataType. I
> >>>> understand that it is necessary to use StructuredType, but at present,
> >>>> planner does not support StructuredType, so the other way is to
> support
> >>>> StructuredType.
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
> >>>>
> >>>>> Would overriding `getConsumedDataType` do the job?
> >>>>>
> >>>>> Best,
> >>>>> Kurt
> >>>>>
> >>>>>
> >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> >>> type
> >>>>>> system which
> >>>>>> allows connectors use the new type system based on DataTypes.
> >>>>>>
> >>>>>> But FLINK-12911 port UpsertStreamTableSink and
> >> RetractStreamTableSink
> >>> to
> >>>>>> flink-api-java-bridge and returns TypeInformation of the requested
> >>>> record
> >>>>>> type which
> >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >>>>>> DECIMAL(p,s).
> >>>>>>
> >>>>>> /**
> >>>>>>   * Returns the requested record type.
> >>>>>>   */
> >>>>>> TypeInformation<T> getRecordType();
> >>>>>>
> >>>>>>
> >>>>>> A proposal is deprecating the *getRecordType* API and adding a
> >>>>>> *getRecordDataType* API instead to return the data type of the
> >>> requested
> >>>>>> record. I have filed the issue FLINK-15469 and
> >>>>>> an initial PR to verify it.
> >>>>>>
> >>>>>> What do you think about this API changes? Any feedback are
> >>> appreciated.
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >>>>>> [2] https://github.com/apache/flink/pull/8596
> >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>>>>>
> >>>>>> *Best Regards,*
> >>>>>> *Zhenghua Gao*
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Jark Wu <im...@gmail.com>.
Cool! Looking forward to the design doc.

Best,
Jark

On Fri, 7 Feb 2020 at 17:26, Timo Walther <tw...@apache.org> wrote:

> Hi Zhenghua,
>
> Jark is right. The reason why we haven't updated those interfaces yet is
> because we are actually would like to introduce new interfaces. We
> should target new interfaces in this release. Even a short-term fix as
> you proposed with `getRecordDataType` does actually not help as Jingsong
> pointed out because we cannot represent tuples in DataType and are also
> not planning to support them natively but only as a structured type in
> the future.
>
> In my envisioned design, the new sink interface should just always get a
> `ChangeRow` which is never serialized and just a data structure for
> communicating between the wrapping sink function and the returned sink
> function by the table sink.
>
> Let me sketch a rough design document that I will share with you
> shortly. Then we could also discuss alternatives.
>
> Thanks,
> Timo
>
>
> On 04.02.20 04:18, Zhenghua Gao wrote:
> > Hi Jark, thanks for your comments.
> >>>> IIUC, the framework will only recognize getRecordDataType and
> >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> > Your are right.
> >
> >>>> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >>>> three getXXXType().
> > the getRecordType and getOutputType is deprecated and mainly for backward
> > compatibility.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <im...@gmail.com> wrote:
> >
> >> Thanks Zhenghua for starting this discussion.
> >>
> >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> >> system which affects usability a lot.
> >> I hope we can fix that in 1.11.
> >>
> >> I'm find with *getRecordDataType* for a temporary solution.
> >> IIUC, the framework will only recognize getRecordDataType and
> >> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> >>
> >> I guess Timo are planning to design a new source/sink interface which
> will
> >> also fix this problem, but I'm not sure the timelines. cc @Timo
> >> It would be better if we can have a new and complete interface, because
> >> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >> three getXXXType().
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <do...@gmail.com> wrote:
> >>
> >>> Hi Jingsong,  For now, only UpsertStreamTableSink and
> >>> RetractStreamTableSink consumes JTuple2
> >>> So the 'getConsumedDataType' interface is not necessary in validate &
> >>> codegen phase.
> >>> See
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >>>   and
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >>>
> >>> What about stay the same to use RAW type?
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Zhenghua,
> >>>>
> >>>> The *getRecordDataType* looks good to me.
> >>>>
> >>>> But the main problem is how to represent the tuple type in DataType. I
> >>>> understand that it is necessary to use StructuredType, but at present,
> >>>> planner does not support StructuredType, so the other way is to
> support
> >>>> StructuredType.
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
> >>>>
> >>>>> Would overriding `getConsumedDataType` do the job?
> >>>>>
> >>>>> Best,
> >>>>> Kurt
> >>>>>
> >>>>>
> >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> >>> type
> >>>>>> system which
> >>>>>> allows connectors use the new type system based on DataTypes.
> >>>>>>
> >>>>>> But FLINK-12911 port UpsertStreamTableSink and
> >> RetractStreamTableSink
> >>> to
> >>>>>> flink-api-java-bridge and returns TypeInformation of the requested
> >>>> record
> >>>>>> type which
> >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >>>>>> DECIMAL(p,s).
> >>>>>>
> >>>>>> /**
> >>>>>>   * Returns the requested record type.
> >>>>>>   */
> >>>>>> TypeInformation<T> getRecordType();
> >>>>>>
> >>>>>>
> >>>>>> A proposal is deprecating the *getRecordType* API and adding a
> >>>>>> *getRecordDataType* API instead to return the data type of the
> >>> requested
> >>>>>> record. I have filed the issue FLINK-15469 and
> >>>>>> an initial PR to verify it.
> >>>>>>
> >>>>>> What do you think about this API changes? Any feedback are
> >>> appreciated.
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >>>>>> [2] https://github.com/apache/flink/pull/8596
> >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>>>>>
> >>>>>> *Best Regards,*
> >>>>>> *Zhenghua Gao*
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Timo Walther <tw...@apache.org>.
Hi Zhenghua,

Jark is right. The reason why we haven't updated those interfaces yet is 
because we are actually would like to introduce new interfaces. We 
should target new interfaces in this release. Even a short-term fix as 
you proposed with `getRecordDataType` does actually not help as Jingsong 
pointed out because we cannot represent tuples in DataType and are also 
not planning to support them natively but only as a structured type in 
the future.

In my envisioned design, the new sink interface should just always get a 
`ChangeRow` which is never serialized and just a data structure for 
communicating between the wrapping sink function and the returned sink 
function by the table sink.

Let me sketch a rough design document that I will share with you 
shortly. Then we could also discuss alternatives.

Thanks,
Timo


On 04.02.20 04:18, Zhenghua Gao wrote:
> Hi Jark, thanks for your comments.
>>>> IIUC, the framework will only recognize getRecordDataType and
>>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> Your are right.
> 
>>>> getRecordDataType is little confused as UpsertStreamTableSink already has
>>>> three getXXXType().
> the getRecordType and getOutputType is deprecated and mainly for backward
> compatibility.
> 
> *Best Regards,*
> *Zhenghua Gao*
> 
> 
> On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <im...@gmail.com> wrote:
> 
>> Thanks Zhenghua for starting this discussion.
>>
>> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
>> system which affects usability a lot.
>> I hope we can fix that in 1.11.
>>
>> I'm find with *getRecordDataType* for a temporary solution.
>> IIUC, the framework will only recognize getRecordDataType and
>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
>>
>> I guess Timo are planning to design a new source/sink interface which will
>> also fix this problem, but I'm not sure the timelines. cc @Timo
>> It would be better if we can have a new and complete interface, because
>> getRecordDataType is little confused as UpsertStreamTableSink already has
>> three getXXXType().
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <do...@gmail.com> wrote:
>>
>>> Hi Jingsong,  For now, only UpsertStreamTableSink and
>>> RetractStreamTableSink consumes JTuple2
>>> So the 'getConsumedDataType' interface is not necessary in validate &
>>> codegen phase.
>>> See
>>>
>>>
>> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
>>>   and
>>>
>>>
>> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
>>>
>>> What about stay the same to use RAW type?
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com>
>> wrote:
>>>
>>>> Hi Zhenghua,
>>>>
>>>> The *getRecordDataType* looks good to me.
>>>>
>>>> But the main problem is how to represent the tuple type in DataType. I
>>>> understand that it is necessary to use StructuredType, but at present,
>>>> planner does not support StructuredType, so the other way is to support
>>>> StructuredType.
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Would overriding `getConsumedDataType` do the job?
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com>
>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
>>> type
>>>>>> system which
>>>>>> allows connectors use the new type system based on DataTypes.
>>>>>>
>>>>>> But FLINK-12911 port UpsertStreamTableSink and
>> RetractStreamTableSink
>>> to
>>>>>> flink-api-java-bridge and returns TypeInformation of the requested
>>>> record
>>>>>> type which
>>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
>>>>>> DECIMAL(p,s).
>>>>>>
>>>>>> /**
>>>>>>   * Returns the requested record type.
>>>>>>   */
>>>>>> TypeInformation<T> getRecordType();
>>>>>>
>>>>>>
>>>>>> A proposal is deprecating the *getRecordType* API and adding a
>>>>>> *getRecordDataType* API instead to return the data type of the
>>> requested
>>>>>> record. I have filed the issue FLINK-15469 and
>>>>>> an initial PR to verify it.
>>>>>>
>>>>>> What do you think about this API changes? Any feedback are
>>> appreciated.
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
>>>>>> [2] https://github.com/apache/flink/pull/8596
>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
>>>>>>
>>>>>> *Best Regards,*
>>>>>> *Zhenghua Gao*
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
> 


Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Zhenghua Gao <do...@gmail.com>.
Hi Jark, thanks for your comments.
>>>IIUC, the framework will only recognize getRecordDataType and
>>>ignore getConsumedDataType for UpsertStreamTableSink, is that right?
Your are right.

>>>getRecordDataType is little confused as UpsertStreamTableSink already has
>>>three getXXXType().
the getRecordType and getOutputType is deprecated and mainly for backward
compatibility.

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <im...@gmail.com> wrote:

> Thanks Zhenghua for starting this discussion.
>
> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> system which affects usability a lot.
> I hope we can fix that in 1.11.
>
> I'm find with *getRecordDataType* for a temporary solution.
> IIUC, the framework will only recognize getRecordDataType and
> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
>
> I guess Timo are planning to design a new source/sink interface which will
> also fix this problem, but I'm not sure the timelines. cc @Timo
> It would be better if we can have a new and complete interface, because
> getRecordDataType is little confused as UpsertStreamTableSink already has
> three getXXXType().
>
> Best,
> Jark
>
>
> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <do...@gmail.com> wrote:
>
> > Hi Jingsong,  For now, only UpsertStreamTableSink and
> > RetractStreamTableSink consumes JTuple2
> > So the 'getConsumedDataType' interface is not necessary in validate &
> > codegen phase.
> > See
> >
> >
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >  and
> >
> >
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >
> > What about stay the same to use RAW type?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com>
> wrote:
> >
> > > Hi Zhenghua,
> > >
> > > The *getRecordDataType* looks good to me.
> > >
> > > But the main problem is how to represent the tuple type in DataType. I
> > > understand that it is necessary to use StructuredType, but at present,
> > > planner does not support StructuredType, so the other way is to support
> > > StructuredType.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
> > >
> > > > Would overriding `getConsumedDataType` do the job?
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com>
> wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> > type
> > > >> system which
> > > >> allows connectors use the new type system based on DataTypes.
> > > >>
> > > >> But FLINK-12911 port UpsertStreamTableSink and
> RetractStreamTableSink
> > to
> > > >> flink-api-java-bridge and returns TypeInformation of the requested
> > > record
> > > >> type which
> > > >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> > > >> DECIMAL(p,s).
> > > >>
> > > >> /**
> > > >>  * Returns the requested record type.
> > > >>  */
> > > >> TypeInformation<T> getRecordType();
> > > >>
> > > >>
> > > >> A proposal is deprecating the *getRecordType* API and adding a
> > > >> *getRecordDataType* API instead to return the data type of the
> > requested
> > > >> record. I have filed the issue FLINK-15469 and
> > > >> an initial PR to verify it.
> > > >>
> > > >> What do you think about this API changes? Any feedback are
> > appreciated.
> > > >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> > > >> [2] https://github.com/apache/flink/pull/8596
> > > >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> > > >>
> > > >> *Best Regards,*
> > > >> *Zhenghua Gao*
> > > >>
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Jark Wu <im...@gmail.com>.
Thanks Zhenghua for starting this discussion.

Currently, all the UpsertStreamTableSinks can't upgrade to the new type
system which affects usability a lot.
I hope we can fix that in 1.11.

I'm find with *getRecordDataType* for a temporary solution.
IIUC, the framework will only recognize getRecordDataType and
ignore getConsumedDataType for UpsertStreamTableSink, is that right?

I guess Timo are planning to design a new source/sink interface which will
also fix this problem, but I'm not sure the timelines. cc @Timo
It would be better if we can have a new and complete interface, because
getRecordDataType is little confused as UpsertStreamTableSink already has
three getXXXType().

Best,
Jark


On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <do...@gmail.com> wrote:

> Hi Jingsong,  For now, only UpsertStreamTableSink and
> RetractStreamTableSink consumes JTuple2
> So the 'getConsumedDataType' interface is not necessary in validate &
> codegen phase.
> See
>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
>  and
>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
>
> What about stay the same to use RAW type?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com> wrote:
>
> > Hi Zhenghua,
> >
> > The *getRecordDataType* looks good to me.
> >
> > But the main problem is how to represent the tuple type in DataType. I
> > understand that it is necessary to use StructuredType, but at present,
> > planner does not support StructuredType, so the other way is to support
> > StructuredType.
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
> >
> > > Would overriding `getConsumedDataType` do the job?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com> wrote:
> > >
> > >> Hi all,
> > >>
> > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> type
> > >> system which
> > >> allows connectors use the new type system based on DataTypes.
> > >>
> > >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink
> to
> > >> flink-api-java-bridge and returns TypeInformation of the requested
> > record
> > >> type which
> > >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> > >> DECIMAL(p,s).
> > >>
> > >> /**
> > >>  * Returns the requested record type.
> > >>  */
> > >> TypeInformation<T> getRecordType();
> > >>
> > >>
> > >> A proposal is deprecating the *getRecordType* API and adding a
> > >> *getRecordDataType* API instead to return the data type of the
> requested
> > >> record. I have filed the issue FLINK-15469 and
> > >> an initial PR to verify it.
> > >>
> > >> What do you think about this API changes? Any feedback are
> appreciated.
> > >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> > >> [2] https://github.com/apache/flink/pull/8596
> > >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> > >>
> > >> *Best Regards,*
> > >> *Zhenghua Gao*
> > >>
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Zhenghua Gao <do...@gmail.com>.
Hi Jingsong,  For now, only UpsertStreamTableSink and
RetractStreamTableSink consumes JTuple2
So the 'getConsumedDataType' interface is not necessary in validate &
codegen phase.
See
https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
 and
https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304

What about stay the same to use RAW type?

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <ji...@gmail.com> wrote:

> Hi Zhenghua,
>
> The *getRecordDataType* looks good to me.
>
> But the main problem is how to represent the tuple type in DataType. I
> understand that it is necessary to use StructuredType, but at present,
> planner does not support StructuredType, so the other way is to support
> StructuredType.
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:
>
> > Would overriding `getConsumedDataType` do the job?
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com> wrote:
> >
> >> Hi all,
> >>
> >> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> >> system which
> >> allows connectors use the new type system based on DataTypes.
> >>
> >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> >> flink-api-java-bridge and returns TypeInformation of the requested
> record
> >> type which
> >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >> DECIMAL(p,s).
> >>
> >> /**
> >>  * Returns the requested record type.
> >>  */
> >> TypeInformation<T> getRecordType();
> >>
> >>
> >> A proposal is deprecating the *getRecordType* API and adding a
> >> *getRecordDataType* API instead to return the data type of the requested
> >> record. I have filed the issue FLINK-15469 and
> >> an initial PR to verify it.
> >>
> >> What do you think about this API changes? Any feedback are appreciated.
> >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >> [2] https://github.com/apache/flink/pull/8596
> >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Jingsong Li <ji...@gmail.com>.
Hi Zhenghua,

The *getRecordDataType* looks good to me.

But the main problem is how to represent the tuple type in DataType. I
understand that it is necessary to use StructuredType, but at present,
planner does not support StructuredType, so the other way is to support
StructuredType.

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <yk...@gmail.com> wrote:

> Would overriding `getConsumedDataType` do the job?
>
> Best,
> Kurt
>
>
> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com> wrote:
>
>> Hi all,
>>
>> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
>> system which
>> allows connectors use the new type system based on DataTypes.
>>
>> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
>> flink-api-java-bridge and returns TypeInformation of the requested record
>> type which
>> can't support types with precision and scale, e.g. TIMESTAMP(p),
>> DECIMAL(p,s).
>>
>> /**
>>  * Returns the requested record type.
>>  */
>> TypeInformation<T> getRecordType();
>>
>>
>> A proposal is deprecating the *getRecordType* API and adding a
>> *getRecordDataType* API instead to return the data type of the requested
>> record. I have filed the issue FLINK-15469 and
>> an initial PR to verify it.
>>
>> What do you think about this API changes? Any feedback are appreciated.
>> [1] https://issues.apache.org/jira/browse/FLINK-12254
>> [2] https://github.com/apache/flink/pull/8596
>> [3] https://issues.apache.org/jira/browse/FLINK-15469
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Posted by Kurt Young <yk...@gmail.com>.
Would overriding `getConsumedDataType` do the job?

Best,
Kurt


On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <do...@gmail.com> wrote:

> Hi all,
>
> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> system which
> allows connectors use the new type system based on DataTypes.
>
> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> flink-api-java-bridge and returns TypeInformation of the requested record
> type which
> can't support types with precision and scale, e.g. TIMESTAMP(p),
> DECIMAL(p,s).
>
> /**
>  * Returns the requested record type.
>  */
> TypeInformation<T> getRecordType();
>
>
> A proposal is deprecating the *getRecordType* API and adding a
> *getRecordDataType* API instead to return the data type of the requested
> record. I have filed the issue FLINK-15469 and
> an initial PR to verify it.
>
> What do you think about this API changes? Any feedback are appreciated.
> [1] https://issues.apache.org/jira/browse/FLINK-12254
> [2] https://github.com/apache/flink/pull/8596
> [3] https://issues.apache.org/jira/browse/FLINK-15469
>
> *Best Regards,*
> *Zhenghua Gao*
>