You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Timo Walther <tw...@apache.org> on 2020/08/19 08:22:10 UTC

[DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Hi everyone,

I would like to propose a FLIP that aims to resolve the remaining 
shortcomings in the Table API:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

The Table API has received many new features over the last year. It 
supports a new type system (FLIP-37), connectors support changelogs 
(FLIP-95), we have well defined internal data structures (FLIP-95), 
support for result retrieval in an interactive fashion (FLIP-84), and 
soon new TableDescriptors (FLIP-129).

However, the interfaces from and to DataStream API have not been touched 
during the introduction of these new features and are kind of outdated. 
The interfaces lack important functionality that is available in Table 
API but not exposed to DataStream API users. DataStream API is still our 
most important API which is why a good interoperability is crucial.

This FLIP is a mixture of different topics that improve the 
interoperability between DataStream and Table API in terms of:

- DataStream <-> Table conversion
- translation of type systems TypeInformation <-> DataType
- schema definition (incl. rowtime, watermarks, primary key)
- changelog handling
- row handling in DataStream API

I'm looking forward to your feedback.

Regards,
Timo

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi all,

A comment from my side on the topic of the current, weird
renaming/naming/reordering when registering a DataStream. It might be
just me, but I find it extremely confusing and I would be really, really
happy if we could simplify it.

I really don't like that the actual behaviour of this method depends on
the input type and set of used operations.

See some examples:

    public static class TestPojo {
        public int a;
        public String b;
        public long c;
    }

        DataStreamSource<TestPojo> ds = env.fromElements(new TestPojo());
        Table table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c")); // reordering of the fields
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("a"),
$("c").as("d")); // reordering with renaming
        table.printSchema();

        table = tableEnv.fromDataStream(ds, $("b"), $("c")); //
projecting out the 1st field
        table.printSchema();

        DataStreamSource<Tuple3<Integer, String, Long>> ds1 =
env.fromElements(Tuple3.of(1, "a", 1L));
        table = tableEnv.fromDataStream(ds1, $("b"), $("a"), $("c")); //
RENAMING without reordering!!! even though exact same arguments as in
the 1st example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("c")); //
projecting out the 3rd field, even though exact same arguments as in the
3rd example
        table.printSchema();

        table = tableEnv.fromDataStream(ds1, $("b"), $("a"),
$("c").as("d")); // illegal renaming, exception is thrown
        table.printSchema();

Why can't we use established operations such as e.g. projections that
always behave the same and field reference is always a field reference
(in current solution it is either reference or alias), as described in
the FLIP?

If it is such a must to be able to rename the fields without their
original names (I agree it is useful for tuples), I would be very much
prefer to see:

tableEnv.fromDataStream(ds, "b", "a", "c"); <- always rename based on
the index and then you can further apply projections.

Again, maybe I am the only one that find it extremely confusing.

Best,

Dawid

On 02/09/2020 11:47, Jark Wu wrote:
> Hi Timo,
>
> 1. "fromDataStream VS fromInsertStream"
> In terms of naming, personally, I prefer `fromDataStream`,
> `fromChangelogStream`, `toDataStream`, `toChangelogStream` than
> `fromInsertStream`, `toInsertStream`.
>
> 2.  "fromDataStream(DataStream, Expression...) VS
> fromInsertStream(DataStream).select()"
> "fromDataStream" supports reference input fields by position, and fields
> are simply renamed.
> I think this is handy, however it is not supported in
> "fromInsertStream(DataStream).select()".
> Is it possible to keep using `fromDataStream(DataStream, Expression...)`
> but deprecate the support of `.rowtime()` and `.proctime()`.
> Instead, users should call `system_rowtime()` and `system_proctime()` if
> they want to derive the time attribute, e.g.
>
> DataStream<Tuple2<String, Long>> stream = ...
> Table table = tableEnv.fromDataStream(stream,
>    $("a"), // rename the first field to 'a'
>    $("b"), // rename the second field to 'b'
>    system_rowtime().as("rowtime"), // extract the internally attached
> timestamp into an event-time
>    system_proctime().as("proctime"));
>
> I think this will be more inline fluent, easy to validate, and make it
> possible to use the existing API. What do you think?
>
> 3. "name-based setters should always be based on fieldNames"
> +1 to have constant fieldName->index mapping. It will be more
> straightforward and avoid confusing.
> We can still introduce the dynamic field index mapping in the future if
> needed.
>
> Best,
> Jark
>
> On Wed, 2 Sep 2020 at 16:19, Timo Walther <tw...@apache.org> wrote:
>
>> Hi everyone
>>
>> thanks for your feedback. It's a lot of content that needs to be
>> digested. I will update the FLIP shortly to incorporate some of the
>> feedback already. But let me respond to some topics first:
>>
>> "not deprecate these API", "the API of the table layer is changing too
>> fast"
>>
>> I agree that deprecating API is definitely not great for users, but in
>> this cases I think it is for the greater good it makes the API more
>> understandable and focuses on common use cases for the future. I would
>> rather say that the API is about to settle because there only a couple
>> of shortcomings left and the bigger picture is clearer than ever. IMO
>> The proposed changes are one of the last bigger API changes on the
>> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
>> methods just because we changed so much in the last releases should not
>> be a reason to keep confusing API. Users are happy to upgrade if they
>> also get more features by upgrading (e.g. fromChangelogStream).
>>
>> 1. "fromDataStream VS fromInsertStream"
>>
>> The main reason to change this API is to have the possibility to update
>> the type mapping without breaking backwards compatibility. The name
>> `fromInsertStream` makes it possible to have new semantics and makes
>> concepts more explicit by naming.
>>
>> 2. "toAppendStream VS toInsertStream"
>>
>> "Append" is common in the Flink community but the outside world uses
>> "insert". Actually, the term "append-only table" is wrong because SQL
>> tables have bag semantics without any order. So "appending" is more of
>> an "insertion". This is also represented in FLIP-95's `RowKind` where we
>> call the concepts INSERT and `ChangelogKind.insertOnly`.
>>
>> 3. "`.rowtime()` and `.proctime()`"
>>
>> "API is also widely used, even in our test code"
>>
>> The test code is already quite outdated and uses a lot of deprecated
>> API. We need to deal with that with a better testing infrastructure. But
>> this can be future work.
>>
>> "users have already accepted it"
>>
>> I'm not sure if users have already accepted it. I think we get at least
>> one question around this topic every week because users would like to
>> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
>> And specifying all fields just to give the StreamRecord timestamp a name
>> should be made easier. This is necessary in 80% of all use cases.
>>
>> 4. "toAppendStream(Table table, Class<T>/TypeInformation)"
>>
>> The DataType system is way easier than the TypeInformation system
>> because it provides a consistent look and feel with a lot of utilities.
>> E.g. many users didn't know that they can just pass `Row.class` in the
>> past. Actually extracting types from a `Row.class` is not supported by
>> the TypeExtractor (we recently even printed a warning to the logs) but
>> we hacked some logic into the method. With AbstractDataType, users can
>> still use classes via `DataTypes.of`; for example
>> `toInsertStream(DataTypes.of(MyPojo.class))`.
>>
>> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>>
>> Similar to `TableEnvironment.execute()` we did some mistakes during the
>> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
>> we introduced it too quickly. In general this method is correct, but now
>> we cannot change the underlying semantics again without breaking
>> existing pipelines. We could keep this method and just change the type
>> system under the hood, in most of the cases the pipeline should still
>> work but we cannot guarantee this due to slight differences.
>>
>> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>,
>> ChangelogMode)"
>>
>> No this is not possible, because T records have no changeflag. Without a
>> changeflag, a ChangelogMode makes not much sense. That's why
>> `from/toChangelogStream` supports only `Row` whereas the
>> `from/toInsertStream` accepts arbitrary type classes.
>>
>> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"
>>
>> I also thought about this method and using `Schema` there. However, with
>> a schema you cannot specify the data type of the top-level record
>> itself. We would need to offer fromDataStream(dataStream, Schema,
>> DataType) or integrate the DataType into the Schema class itself which
>> would mix up the concepts.
>>
>> 8. "name-based setters should always be based on fieldNames"
>>
>> I'm fine with throwing an exception. If my mentioned semantics, are too
>> confusing.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 02.09.20 07:25, Jingsong Li wrote:
>>>> a Row has two modes represented by an internal boolean flag
>>> `hasFieldOrder`
>>>
>>> +1 confusion with Dawid that what's the result when index-based setters
>> and
>>> name-based setters are mixed used.
>>> And name-based setters look like append instead of set.
>>>
>>> It reminds me of Avro's `GenericRecord`, We should support real random
>>> name-based setters instead of append.
>>>
>>> So, what I think is, name-based setters should always be based
>>> on fieldNames just like name-based getters. Otherwise, throw an
>> exception.
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:
>>>
>>>> Timo, Thanks for the discussion
>>>>
>>>> I have only read the "Conversion of DataStream to Table" part so i would
>>>> only put some objections there ~
>>>>
>>>>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
>>>> At first glance, from the perspective of a user, i'm confused by why we
>>>> must dintinguish on the API level what a data stream is, e.g. an insert
>>>> stream or whatever other kind of stream.
>>>>
>>>> As a user, he does not expect to must distinguish between several
>>>> datastream options. The framework should have the ability to infer the
>>>> ChangelogMode of the stream, but sadly we can not at the moment, becase
>> we
>>>> do not have a metadata to describe the ChangelogMode what actually the
>>>> framework need.
>>>>
>>>> And could it be:
>>>>
>>>> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode)
>> where
>>>> the ChanglogMode is optional because 90% of the datastream are insert
>> for
>>>> now.
>>>>
>>>> or:
>>>>
>>>> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
>>>> self-describing what kind of stream it is (again, if not specified, the
>>>> default is INSERT).
>>>>
>>>>> tEnv
>>>>> .fromInsertStream(DataStream<T>)
>>>>> .select('*, system_rowtime().as("rowtime"),
>>>> system_proctime().as(“proctime”))
>>>>
>>>> In order to declare the time-attributes on datastream, i must say I
>> prefer
>>>> tEnv.fromDataStream(dataStream, Schema) for these reasons:
>>>>
>>>> - Schema is the uniform interface to declare the metadata for a table in
>>>> the Table/SQL API, with an imperative coding style, in Descriptor API we
>>>> also use it for the time-attributes purpose
>>>> - Use a projection for time-attributes is not a good idea, because from
>>>> the SQL side, we declare it as a metadata of part of the table schema
>> when
>>>> we define the DDL. Although we may explain the DDL internally using
>>>> computed column, that does not mean we must do that in the DataStream
>> API
>>>> explicitly. In the SQL world, no projection function outputs type of
>>>> time-attribute, we better still put the time-attributes in the scope of
>> the
>>>> table metadata.
>>>>
>>>> Best,
>>>> Danny Chan
>>>> 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been
>> touched
>>>>> during the introduction of these new features and are kind of outdated.
>>>>> The interfaces lack important functionality that is available in Table
>>>>> API but not exposed to DataStream API users. DataStream API is still
>> our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>
>>


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

1. "fromDataStream VS fromInsertStream"
In terms of naming, personally, I prefer `fromDataStream`,
`fromChangelogStream`, `toDataStream`, `toChangelogStream` than
`fromInsertStream`, `toInsertStream`.

2.  "fromDataStream(DataStream, Expression...) VS
fromInsertStream(DataStream).select()"
"fromDataStream" supports reference input fields by position, and fields
are simply renamed.
I think this is handy, however it is not supported in
"fromInsertStream(DataStream).select()".
Is it possible to keep using `fromDataStream(DataStream, Expression...)`
but deprecate the support of `.rowtime()` and `.proctime()`.
Instead, users should call `system_rowtime()` and `system_proctime()` if
they want to derive the time attribute, e.g.

DataStream<Tuple2<String, Long>> stream = ...
Table table = tableEnv.fromDataStream(stream,
   $("a"), // rename the first field to 'a'
   $("b"), // rename the second field to 'b'
   system_rowtime().as("rowtime"), // extract the internally attached
timestamp into an event-time
   system_proctime().as("proctime"));

I think this will be more inline fluent, easy to validate, and make it
possible to use the existing API. What do you think?

3. "name-based setters should always be based on fieldNames"
+1 to have constant fieldName->index mapping. It will be more
straightforward and avoid confusing.
We can still introduce the dynamic field index mapping in the future if
needed.

Best,
Jark

On Wed, 2 Sep 2020 at 16:19, Timo Walther <tw...@apache.org> wrote:

> Hi everyone
>
> thanks for your feedback. It's a lot of content that needs to be
> digested. I will update the FLIP shortly to incorporate some of the
> feedback already. But let me respond to some topics first:
>
> "not deprecate these API", "the API of the table layer is changing too
> fast"
>
> I agree that deprecating API is definitely not great for users, but in
> this cases I think it is for the greater good it makes the API more
> understandable and focuses on common use cases for the future. I would
> rather say that the API is about to settle because there only a couple
> of shortcomings left and the bigger picture is clearer than ever. IMO
> The proposed changes are one of the last bigger API changes on the
> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
> methods just because we changed so much in the last releases should not
> be a reason to keep confusing API. Users are happy to upgrade if they
> also get more features by upgrading (e.g. fromChangelogStream).
>
> 1. "fromDataStream VS fromInsertStream"
>
> The main reason to change this API is to have the possibility to update
> the type mapping without breaking backwards compatibility. The name
> `fromInsertStream` makes it possible to have new semantics and makes
> concepts more explicit by naming.
>
> 2. "toAppendStream VS toInsertStream"
>
> "Append" is common in the Flink community but the outside world uses
> "insert". Actually, the term "append-only table" is wrong because SQL
> tables have bag semantics without any order. So "appending" is more of
> an "insertion". This is also represented in FLIP-95's `RowKind` where we
> call the concepts INSERT and `ChangelogKind.insertOnly`.
>
> 3. "`.rowtime()` and `.proctime()`"
>
> "API is also widely used, even in our test code"
>
> The test code is already quite outdated and uses a lot of deprecated
> API. We need to deal with that with a better testing infrastructure. But
> this can be future work.
>
> "users have already accepted it"
>
> I'm not sure if users have already accepted it. I think we get at least
> one question around this topic every week because users would like to
> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
> And specifying all fields just to give the StreamRecord timestamp a name
> should be made easier. This is necessary in 80% of all use cases.
>
> 4. "toAppendStream(Table table, Class<T>/TypeInformation)"
>
> The DataType system is way easier than the TypeInformation system
> because it provides a consistent look and feel with a lot of utilities.
> E.g. many users didn't know that they can just pass `Row.class` in the
> past. Actually extracting types from a `Row.class` is not supported by
> the TypeExtractor (we recently even printed a warning to the logs) but
> we hacked some logic into the method. With AbstractDataType, users can
> still use classes via `DataTypes.of`; for example
> `toInsertStream(DataTypes.of(MyPojo.class))`.
>
> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>
> Similar to `TableEnvironment.execute()` we did some mistakes during the
> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
> we introduced it too quickly. In general this method is correct, but now
> we cannot change the underlying semantics again without breaking
> existing pipelines. We could keep this method and just change the type
> system under the hood, in most of the cases the pipeline should still
> work but we cannot guarantee this due to slight differences.
>
> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>,
> ChangelogMode)"
>
> No this is not possible, because T records have no changeflag. Without a
> changeflag, a ChangelogMode makes not much sense. That's why
> `from/toChangelogStream` supports only `Row` whereas the
> `from/toInsertStream` accepts arbitrary type classes.
>
> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"
>
> I also thought about this method and using `Schema` there. However, with
> a schema you cannot specify the data type of the top-level record
> itself. We would need to offer fromDataStream(dataStream, Schema,
> DataType) or integrate the DataType into the Schema class itself which
> would mix up the concepts.
>
> 8. "name-based setters should always be based on fieldNames"
>
> I'm fine with throwing an exception. If my mentioned semantics, are too
> confusing.
>
> Regards,
> Timo
>
>
>
> On 02.09.20 07:25, Jingsong Li wrote:
> >> a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`
> >
> > +1 confusion with Dawid that what's the result when index-based setters
> and
> > name-based setters are mixed used.
> > And name-based setters look like append instead of set.
> >
> > It reminds me of Avro's `GenericRecord`, We should support real random
> > name-based setters instead of append.
> >
> > So, what I think is, name-based setters should always be based
> > on fieldNames just like name-based getters. Otherwise, throw an
> exception.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:
> >
> >> Timo, Thanks for the discussion
> >>
> >> I have only read the "Conversion of DataStream to Table" part so i would
> >> only put some objections there ~
> >>
> >>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
> >>
> >> At first glance, from the perspective of a user, i'm confused by why we
> >> must dintinguish on the API level what a data stream is, e.g. an insert
> >> stream or whatever other kind of stream.
> >>
> >> As a user, he does not expect to must distinguish between several
> >> datastream options. The framework should have the ability to infer the
> >> ChangelogMode of the stream, but sadly we can not at the moment, becase
> we
> >> do not have a metadata to describe the ChangelogMode what actually the
> >> framework need.
> >>
> >> And could it be:
> >>
> >> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode)
> where
> >> the ChanglogMode is optional because 90% of the datastream are insert
> for
> >> now.
> >>
> >> or:
> >>
> >> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
> >> self-describing what kind of stream it is (again, if not specified, the
> >> default is INSERT).
> >>
> >>> tEnv
> >>> .fromInsertStream(DataStream<T>)
> >>> .select('*, system_rowtime().as("rowtime"),
> >> system_proctime().as(“proctime”))
> >>
> >> In order to declare the time-attributes on datastream, i must say I
> prefer
> >>
> >> tEnv.fromDataStream(dataStream, Schema) for these reasons:
> >>
> >> - Schema is the uniform interface to declare the metadata for a table in
> >> the Table/SQL API, with an imperative coding style, in Descriptor API we
> >> also use it for the time-attributes purpose
> >> - Use a projection for time-attributes is not a good idea, because from
> >> the SQL side, we declare it as a metadata of part of the table schema
> when
> >> we define the DDL. Although we may explain the DDL internally using
> >> computed column, that does not mean we must do that in the DataStream
> API
> >> explicitly. In the SQL world, no projection function outputs type of
> >> time-attribute, we better still put the time-attributes in the scope of
> the
> >> table metadata.
> >>
> >> Best,
> >> Danny Chan
> >> 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
> >>> Hi everyone,
> >>>
> >>> I would like to propose a FLIP that aims to resolve the remaining
> >>> shortcomings in the Table API:
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>
> >>> The Table API has received many new features over the last year. It
> >>> supports a new type system (FLIP-37), connectors support changelogs
> >>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>> support for result retrieval in an interactive fashion (FLIP-84), and
> >>> soon new TableDescriptors (FLIP-129).
> >>>
> >>> However, the interfaces from and to DataStream API have not been
> touched
> >>> during the introduction of these new features and are kind of outdated.
> >>> The interfaces lack important functionality that is available in Table
> >>> API but not exposed to DataStream API users. DataStream API is still
> our
> >>> most important API which is why a good interoperability is crucial.
> >>>
> >>> This FLIP is a mixture of different topics that improve the
> >>> interoperability between DataStream and Table API in terms of:
> >>>
> >>> - DataStream <-> Table conversion
> >>> - translation of type systems TypeInformation <-> DataType
> >>> - schema definition (incl. rowtime, watermarks, primary key)
> >>> - changelog handling
> >>> - row handling in DataStream API
> >>>
> >>> I'm looking forward to your feedback.
> >>>
> >>> Regards,
> >>> Timo
> >>
> >
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi,

I'm +1 to use the naming of from/toDataStream, rather than
from/toInsertStream. So we don't need to deprecate the existing
`fromDataStream`.

I'm +1 to Danny's proposal: fromDataStream(dataStream, Schema) and
toDataStream(table, AbstractDataType<?>)

I think we can also keep the method `createTemporaryView(path,
DataStream<T>)`.
I don't have a strong opinion on deprecating fromDataStream(datastream,
exprs), but slightly prefer to keep them.

Best,
Jark

On Wed, 9 Sep 2020 at 14:34, Danny Chan <yu...@gmail.com> wrote:

> Thanks for the summary Timo ~
>
> I want to clarify a little bit, what is the conclusion about the
> fromChangelogStream and toChangelogStream, should we use this name or we
> use fromDataStream with an optional ChangelogMode flag ?
>
> Best,
> Danny Chan
> 在 2020年9月8日 +0800 PM8:22,Timo Walther <tw...@apache.org>,写道:
> > Hi Danny,
> >
> > Your proposed signatures sound good to me.
> >
> > fromDataStream(dataStream, Schema)
> > toDataStream(table, AbstractDataType<?>)
> >
> > They address all my concerns. The API would not be symmetric anymore,
> > but this is fine with me. Others raised concerns about deprecating
> > `fromDataStream(dataStream, Expression)`. Are they fine with this as
> well?
> >
> > If there are no objections, I would update the FLIP with the methods
> > above. Bu let me briefly summarize my thoughts on this again, so that we
> > are all on the same page:
> > - The biggest discussion point seems the fromInsertStream/toInsertStream.
> > - I don’t have a strong opinion on naming, from/toDataStream would be
> > fine for me. But:
> > - It slightly different type mappings and might break existing pipelines
> > silently. This point can be neglected as the differences are only minor.
> > - We need a way of declaring the rowtime attribute but without declaring
> > all columns again. Reduce manual schema work as much as possible.
> > - Both Dawid and I don’t like the current either “position based” or
> > “name based” expression logic that looks like a projection but is not.
> > - Actually name based expressions are not necessary, since we have
> > positions for all new data types.
> > - Schema is not suitable to influence the output type for toDataStream.
> > It should be DataType.
> >
> > All items are solved by Danny's suggestion.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 08.09.20 14:04, Danny Chan wrote:
> > > Hi, Timo ~
> > >
> > > "It is not about changelog mode compatibility, it is about the type
> > > compatibility.”
> > >
> > > For fromDataStream(dataStream, Schema), there should not be
> compatibility problem or data type inconsistency. We know the logical type
> from Schema and physical type from the dataStream itself.
> > >
> > > For toDataStream(table, AbstractDataType<?>), we can get the logical
> type from the table itself
> > > and the physical type from the passed data type.
> > >
> > > If both behavior are deterministic, what's the problem for type
> compatibility and safety?
> > >
> > > My concern is that in most of the cases, people use the "insert
> stream", they do not need to care about
> > > the data stream ChangelogMode, so there is no need to distinguish them
> from the APIs, an optional param is enough. If we introduces 2 new API
> there, people have to choose between them, and can fromChangelogStream()
> > > accept an insert stream ? What is the behavior if fromInsertStream()
> accepts a changelog stream ?
> > >
> > >
> > > "This means potentially duplicate definition of fields and their data
> types etc”
> > >
> > > I agree, because table already has an underlying schema there.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年9月3日 +0800 PM8:12,Timo Walther <tw...@apache.org>,写道:
> > > > Hi Danny,
> > > >
> > > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > > compatible"
> > > >
> > > > It is not about changelog mode compatibility, it is about the type
> > > > compatibility. The renaming to `toInsertStream` is only to have a
> mean
> > > > of dealing with data type inconsistencies that could break existing
> > > > pipelines.
> > > >
> > > > As the FLIP describes, the following new behavior should be
> implemented:
> > > >
> > > > - It does this by translating the TypeInformation to DataType.
> > > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > > longer produce LegacyTypeInformationType.
> > > > - All types from DataStream API should be supported by this
> converter.
> > > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > > StructuredType.
> > > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > > - Composite types (tuples, POJOs, rows) will be flattened by default
> if
> > > > they are used as top-level records (similar to the old behavior).
> > > > - The order of POJO field's is determined by the DataTypeExtractor
> and
> > > > must not be defined manually anymore.
> > > > - GenericTypeInfo is converted to RawType immediately by considering
> the
> > > > current configuration.
> > > > - A DataStream that originated from Table API will keep its DataType
> > > > information due to ExternalTypeInfo implementing DataTypeQueryable.
> > > >
> > > > I would feel safer if we do this under a new method name.
> > > >
> > > > "toDataStream(table, schema.bindTo(DataType))"
> > > >
> > > > This is what I meant with "integrate the DataType into the Schema
> class
> > > > itself". Yes, we can do that if everybody is fine with it. But why
> > > > should a user specify both a schema and a data type? This means
> > > > potentially duplicate definition of fields and their data types etc.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 03.09.20 11:31, Danny Chan wrote:
> > > > > "It is a more conservative approach to introduce that in a
> > > > > new method rather than changing the existing one under the hood and
> > > > > potentially break existing pipelines silently”
> > > > >
> > > > > I like the idea actually, but if ChangelogMode.INSERT is the
> default, existing pipelines should be compatible. We can see the other
> kinds of ChangelogMode as an extension.
> > > > >
> > > > > “for `toDataStream` users need to be
> > > > > able to express whether they would prefer Row, POJO or atomic”
> > > > >
> > > > > I think most of the cases people do not need to convert the stream
> to a Row or POJO, because the table projection always returns a flatternned
> internal row, if people did want a POJO there, how about we bind the
> DataType to the existing schema, like this
> > > > >
> > > > > toDataStream(table, schema.bindTo(DataType))
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > > 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
> > > > > >
> > > > > > It is a more conservative approach to introduce that in a
> > > > > > new method rather than changing the existing one under the hood
> and
> > > > > > potentially break existing pipelines silently
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
I prefer to have separate APIs for them as changelog stream requires Row
type.
It would make the API more straightforward and reduce the confusion.

Best,
Jark

On Wed, 9 Sep 2020 at 16:21, Timo Walther <tw...@apache.org> wrote:

> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>
> >> But I think the planner needs to
> >> know whether the input is insert-only or not.
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Kurt Young <yk...@gmail.com>.
I see, I missed the part that a row is either in positioned mode or nameed
mode.
I can live with this. Thanks.

Best,
Kurt


On Wed, Sep 23, 2020 at 9:07 PM Timo Walther <tw...@apache.org> wrote:

> But the examples you mentioned would not be different.
>
> By calling `Row.withNames()`, the row has no definition of position. All
> position-based methods would throw an exception.
>
> The hashCode()/equals() would return true for:
>
>  > Row row1 = Row.withNames();
>  > row.setField("a", 1);
>  > row.setField("b", 2);
>  >
>  > Row row2 = Row.withNames();
>  > row.setField("b", 2);
>  > row.setField("a", 1);
>
> row2.equals(row1)
>
> The row is just a container for the serializer/converter which will
> ensure ordering.
>
> Regards,
> Timo
>
> On 23.09.20 15:00, Kurt Young wrote:
> > Thanks for the detailed response, 1-5 sounds good to me.
> >
> > For #6, I just think of another case which would also annoy users.
> Consider
> > code like this:
> >
> > Row row = Row.withNames();
> > row.setField("a", 1);
> > row.setField("b", 2);
> >
> > and for second time, he changes the sequence of setting method calls:
> >
> > Row row = Row.withNames();
> > row.setField("b", 2);
> > row.setField("a", 1);
> >
> > I don't think anyone would expect these two rows are actually different.
> >
> > Instead, if we at least define the field names first, which will fix the
> > order, we would not have such side effects.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <tw...@apache.org> wrote:
> >
> >> Hi Kurt,
> >>
> >> thanks for your feedback.
> >>
> >> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> >> One could argue that the API would look similar to a CREATE TABLE
> >> statement: first schema then connector. I updated the FLIP.
> >>
> >> 2. "will we do some verification?"
> >> Yes, we will definitely do verification. It will happen based on what is
> >> available in TypeInformation.
> >>
> >> "if T is a Tuple, do we have some rules for setting field names in
> Schema?"
> >> The rule in this case would be to take the
> >> TupleTypeInfoBase.getFieldNames() similar to the logic we currently
> have.
> >>
> >> "Will we do some type coercion?"
> >> For `fromDataStream()`, type coercion between an explicitly specified
> >> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
> >> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> >> desired data type explicitly and expects correctness.
> >> For `toDataStream()`, it has similar type coercion semantics as a
> >> regular table sink (first on a logical level, then on a class level).
> >>
> >> It is difficult to list all type rules upfront, but it should behave
> >> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> >> discussion about other type handling to the individual PRs. The general
> >> goal should be to stay backwards compatible but reduce manual schema
> work.
> >>
> >> 3. "How do you derive schema from DataStream<Row>"
> >>
> >> We use RowTypeInfo (if DataStream comes from DataStream API) or
> >> ExternalTypeInfo (if DataStream comes from Table API).
> >>
> >> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
> >> method is necessary"
> >> Dealing with Row in DataStream API is very inconvenient. With the new
> >> data format converters, the behavior would be consistent accross
> >> DataStream API and Table functions. The logic is already present and
> >> seems to be pretty stable so far. We would break a lot of existing code
> >> if we get rid of this method.
> >>
> >> 5. "How does Row behave like GenericRowData?"
> >>
> >> Row can contain StringData or further nested RowData. The data format
> >> converters support that. The conversion of fields would be a no-op in
> >> this case. In the end, both Row and GenericRowData just stored an
> Object[].
> >>
> >> 6. "They would expect that all the fields they didn't set should be
> NULL."
> >>
> >> But this will be the case. The full list of all field names and their
> >> order is defined by the data type, not the Row instance. During
> >> serialization/conversion we can reorder fields, throw exceptions about
> >> unknown field names, and set remaining fields to NULL.
> >>
> >> If a user uses `new Row(5)` but the serializer is configured by a data
> >> type that only supports `Row(3)`, it will also throw an exception during
> >> runtime. We cannot guard users from creating invalid rows. But the
> >> strongly typed serializers/converters will do the final verification.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 23.09.20 12:08, Kurt Young wrote:
> >>> Sorry for being late, I went through the design doc and here are
> >>> my comments:
> >>>
> >>> 1. A minor one, how about moving Schema after DataStream in all
> affected
> >>> APIs? Such as:
> >>> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> >>> StreamTableEnvironment.createTemporaryView(String, Schema,
> >> DataStream<T>):
> >>> Unit
> >>> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
> >> Table
> >>> StreamTableEnvironment.toChangelogStream(Schema, Table):
> DataStream<Row>
> >>>
> >>> It will look more aligned with APIs which don't have Schema. For
> example:
> >>> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> >>> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
> >>>
> >>> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> >>> DataStream<T>): Table
> >>> How do we convert the types between Schema and T, will we do some
> >>> verification? Will we do some type coercion? For example,
> >>> can we support Schema.LONG with DataStream<Integer>? And if T is a
> Tuple,
> >>> do we have some rules for setting field names in Schema?
> >>> I can see lots of imagination from this method but the rules are
> unclear
> >> to
> >>> me.
> >>>
> >>> 3. A question to:
> >> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> >>> Table
> >>> How do you derive schema from DataStream<Row>?
> >>>
> >>> 4. A question to:
> >> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> >>> Table): DataStream<T>
> >>> I'm wondering whether this method is necessary. Always getting a
> >>> DataStream<Row> from the table and then manually applying some
> >>> map function seems to be not cumbersome and safer (such intelligent
> >>> conversion always seems error prone to me).
> >>>
> >>> 5.
> >>>> The `toChangelogStream(Schema, Table)` exists for completeness to
> have a
> >>> symmetric API.
> >>>> It allows for declaring the data type for output similar to
> >>> DynamicTableSinks.
> >>>> Additionally, internal structures such as StringData, TimestampData
> can
> >>> still be used by power users.
> >>>> In that sense, Row can behave like a GenericRowData.
> >>>
> >>> How does Row behave like GenericRowData? I don't think Row can work
> with
> >>> RowData for now.
> >>>
> >>> 6. Row.withNames() seems dangerous to me. It relies on user setting all
> >> the
> >>> fields they need during `setField(String name, T value)`.
> >>> It's also highly possible that users would not set certain fields when
> >> for
> >>> example some fields are NULL. They would expect that all the fields
> >>> they didn't set should be NULL.
> >>> Row.withNames(String[] filedNames) or Row.withNames(List<String>
> >>> fieldNames) seems to be a safer choice.
> >>> I agree that simplicity is important but making API safer to use is
> also
> >>> important.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <tw...@apache.org>
> wrote:
> >>>
> >>>> Hi Jark,
> >>>>
> >>>> thanks for your feedback. I removed `withNamesAndPositions` from the
> >>>> public API list and added a comment that this is only internal API for
> >>>> converters and serializers.
> >>>>
> >>>> I would start a new vote tomorrow if there are no objections.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> On 23.09.20 08:55, Jark Wu wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Sorry for the late reply.
> >>>>> I think it would be great if we can make `withNamesAndPositions`
> >> internal
> >>>>> visible. This reduces the complexity of the public API.
> >>>>> It's hard to come up with a perfect solution. So let's move on this
> >> FLIP.
> >>>>> I don't have other concerns.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org>
> wrote:
> >>>>>
> >>>>>> Hi Jark,
> >>>>>>
> >>>>>> the fieldNames map is not intended for users. I would also be fine
> to
> >>>>>> make it a default scope constructor and access it with some internal
> >>>>>> utility class next to the Row class. The fieldNames map must only be
> >>>>>> used by serializers and converters. A user has no benefit in using
> it.
> >>>>>>
> >>>>>> For the creation of new rows (without reusing, which only advanced
> >> users
> >>>>>> usually do), I don't see a benefit of having:
> >>>>>>
> >>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>>> reuse.setField("myField", 12);
> >>>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>>
> >>>>>> The purpose of Row.withName() is too create a Row easily and
> readable
> >>>>>> without declaring 50+ column names or dealing with indices in this
> >>>> range.
> >>>>>>
> >>>>>> Personally, I would like to make Row an interface and have concrete
> >> row
> >>>>>> implementations for different purposes but this would break existing
> >>>>>> programs too much.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 18.09.20 11:04, Jark Wu wrote:
> >>>>>>> Personally I think the fieldNames Map is confusing and not handy.
> >>>>>>> I just have an idea but not sure what you think.
> >>>>>>> What about adding a new constructor with List field names, this
> >> enables
> >>>>>> all
> >>>>>>> name-based setter/getters.
> >>>>>>> Regarding to List -> Map cost for every record, we can suggest
> users
> >> to
> >>>>>>> reuse the Row in the task.
> >>>>>>>
> >>>>>>> new Row(int arity)
> >>>>>>> new Row(List<String> fieldNames)
> >>>>>>>
> >>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>>>> reuse.setField("myField", 12);
> >>>>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>>>
> >>>>>>> My point is that, if we can have a handy constructor for named Row,
> >> we
> >>>>>> may
> >>>>>>> not need to distinguish the named-only or positionAndNamed mode.
> >>>>>>> This can avoid (fast-fail) the potential problem when setting an
> >>>> invalid
> >>>>>>> field.
> >>>>>>>
> >>>>>>> We can also come up with a new class for the field names which will
> >>>>>>> construct the Map and be shared among all Row instances.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday
> to
> >>>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>>>>>> particular I updated the `Improve dealing with Row in DataStream
> >> API`
> >>>>>>>> section another time. We introduced static methods for Row that
> >> should
> >>>>>>>> make the semantics clear to users:
> >>>>>>>>
> >>>>>>>> // allows to use index-based setters and getters (equivalent to
> new
> >>>>>>>> Row(int))
> >>>>>>>> // method exists for completeness
> >>>>>>>> public static withPositions(int length);
> >>>>>>>>
> >>>>>>>> // allows to use name-based setters and getters
> >>>>>>>> public static withNames();
> >>>>>>>>
> >>>>>>>> // allows to use both name-based and position-based setters and
> >>>> getters
> >>>>>>>> public static withNamesAndPositions(Map<String, Integer>
> >> fieldNames);
> >>>>>>>>
> >>>>>>>> In any case, non of the existing methods will be deprecated and
> only
> >>>>>>>> additional functionality will be available through the methods
> >> above.
> >>>>>>>>
> >>>>>>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>>>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> >>>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
> >> with
> >>>>>> the
> >>>>>>>>>> outcome. I think the result is one of the last core API
> >> refactoring
> >>>>>> and
> >>>>>>>>>> users will be happy to have a consistent changelog support.
> Thanks
> >>>> for
> >>>>>>>>>> all the contributions.
> >>>>>>>>>>
> >>>>>>>>>> If there are no objections, I would continue with a voting.
> >>>>>>>>>>
> >>>>>>>>>> What do you think?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>>>>>> Thanks, i'm fine with that.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret
> it
> >>>> as
> >>>>>> a
> >>>>>>>>>>>> changelog something.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
> >> with
> >>>>>> Row
> >>>>>>>>>>>> otherwise users are confused due to duplicate records with a
> >>>> missing
> >>>>>>>>>>>> changeflag.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> >>>> continue
> >>>>>>>> to a
> >>>>>>>>>>>> vote.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>>>>>> I think it would bring in much confusion by a different API
> >> name
> >>>>>> just
> >>>>>>>>>>>> because the DataStream generic type is different.
> >>>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
> >> have a
> >>>>>>>> type
> >>>>>>>>>>>> check there ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Switch to a new API name does not really solve the problem
> >> well,
> >>>>>>>> people
> >>>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
> >> are
> >>>>>> some
> >>>>>>>>>>>> confusions:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> • Should DataStream of Row type always use
> >> #fromChangelogStream ?
> >>>>>>>>>>>>> • Does fromChangelogStream works for only INSERT
> ChangelogMode
> >> ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <twalthr@apache.org
> >,写道:
> >>>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
> >> least
> >>>>>> for
> >>>>>>>> the
> >>>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>>>>>> possible.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we
> keep
> >>>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >>>>>> retractions
> >>>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>>>>>> duplicate
> >>>>>>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
> >>>>>> ChangelogMode
> >>>>>>>>>>>> they like as an optional param.
> >>>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> >>>> ChangelogMode
> >>>>>>>> is
> >>>>>>>>>>>> INSERT.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
But the examples you mentioned would not be different.

By calling `Row.withNames()`, the row has no definition of position. All 
position-based methods would throw an exception.

The hashCode()/equals() would return true for:

 > Row row1 = Row.withNames();
 > row.setField("a", 1);
 > row.setField("b", 2);
 >
 > Row row2 = Row.withNames();
 > row.setField("b", 2);
 > row.setField("a", 1);

row2.equals(row1)

The row is just a container for the serializer/converter which will 
ensure ordering.

Regards,
Timo

On 23.09.20 15:00, Kurt Young wrote:
> Thanks for the detailed response, 1-5 sounds good to me.
> 
> For #6, I just think of another case which would also annoy users. Consider
> code like this:
> 
> Row row = Row.withNames();
> row.setField("a", 1);
> row.setField("b", 2);
> 
> and for second time, he changes the sequence of setting method calls:
> 
> Row row = Row.withNames();
> row.setField("b", 2);
> row.setField("a", 1);
> 
> I don't think anyone would expect these two rows are actually different.
> 
> Instead, if we at least define the field names first, which will fix the
> order, we would not have such side effects.
> 
> Best,
> Kurt
> 
> 
> On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <tw...@apache.org> wrote:
> 
>> Hi Kurt,
>>
>> thanks for your feedback.
>>
>> 1. "moving Schema after DataStream": I don't have a strong opinion here.
>> One could argue that the API would look similar to a CREATE TABLE
>> statement: first schema then connector. I updated the FLIP.
>>
>> 2. "will we do some verification?"
>> Yes, we will definitely do verification. It will happen based on what is
>> available in TypeInformation.
>>
>> "if T is a Tuple, do we have some rules for setting field names in Schema?"
>> The rule in this case would be to take the
>> TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.
>>
>> "Will we do some type coercion?"
>> For `fromDataStream()`, type coercion between an explicitly specified
>> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
>> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
>> desired data type explicitly and expects correctness.
>> For `toDataStream()`, it has similar type coercion semantics as a
>> regular table sink (first on a logical level, then on a class level).
>>
>> It is difficult to list all type rules upfront, but it should behave
>> similar to all the work done in FLIP-65 and FLIP-95. I would move the
>> discussion about other type handling to the individual PRs. The general
>> goal should be to stay backwards compatible but reduce manual schema work.
>>
>> 3. "How do you derive schema from DataStream<Row>"
>>
>> We use RowTypeInfo (if DataStream comes from DataStream API) or
>> ExternalTypeInfo (if DataStream comes from Table API).
>>
>> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
>> method is necessary"
>> Dealing with Row in DataStream API is very inconvenient. With the new
>> data format converters, the behavior would be consistent accross
>> DataStream API and Table functions. The logic is already present and
>> seems to be pretty stable so far. We would break a lot of existing code
>> if we get rid of this method.
>>
>> 5. "How does Row behave like GenericRowData?"
>>
>> Row can contain StringData or further nested RowData. The data format
>> converters support that. The conversion of fields would be a no-op in
>> this case. In the end, both Row and GenericRowData just stored an Object[].
>>
>> 6. "They would expect that all the fields they didn't set should be NULL."
>>
>> But this will be the case. The full list of all field names and their
>> order is defined by the data type, not the Row instance. During
>> serialization/conversion we can reorder fields, throw exceptions about
>> unknown field names, and set remaining fields to NULL.
>>
>> If a user uses `new Row(5)` but the serializer is configured by a data
>> type that only supports `Row(3)`, it will also throw an exception during
>> runtime. We cannot guard users from creating invalid rows. But the
>> strongly typed serializers/converters will do the final verification.
>>
>> Regards,
>> Timo
>>
>>
>> On 23.09.20 12:08, Kurt Young wrote:
>>> Sorry for being late, I went through the design doc and here are
>>> my comments:
>>>
>>> 1. A minor one, how about moving Schema after DataStream in all affected
>>> APIs? Such as:
>>> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
>>> StreamTableEnvironment.createTemporaryView(String, Schema,
>> DataStream<T>):
>>> Unit
>>> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
>> Table
>>> StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
>>>
>>> It will look more aligned with APIs which don't have Schema. For example:
>>> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
>>> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
>>>
>>> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
>>> DataStream<T>): Table
>>> How do we convert the types between Schema and T, will we do some
>>> verification? Will we do some type coercion? For example,
>>> can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
>>> do we have some rules for setting field names in Schema?
>>> I can see lots of imagination from this method but the rules are unclear
>> to
>>> me.
>>>
>>> 3. A question to:
>> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
>>> Table
>>> How do you derive schema from DataStream<Row>?
>>>
>>> 4. A question to:
>> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
>>> Table): DataStream<T>
>>> I'm wondering whether this method is necessary. Always getting a
>>> DataStream<Row> from the table and then manually applying some
>>> map function seems to be not cumbersome and safer (such intelligent
>>> conversion always seems error prone to me).
>>>
>>> 5.
>>>> The `toChangelogStream(Schema, Table)` exists for completeness to have a
>>> symmetric API.
>>>> It allows for declaring the data type for output similar to
>>> DynamicTableSinks.
>>>> Additionally, internal structures such as StringData, TimestampData can
>>> still be used by power users.
>>>> In that sense, Row can behave like a GenericRowData.
>>>
>>> How does Row behave like GenericRowData? I don't think Row can work with
>>> RowData for now.
>>>
>>> 6. Row.withNames() seems dangerous to me. It relies on user setting all
>> the
>>> fields they need during `setField(String name, T value)`.
>>> It's also highly possible that users would not set certain fields when
>> for
>>> example some fields are NULL. They would expect that all the fields
>>> they didn't set should be NULL.
>>> Row.withNames(String[] filedNames) or Row.withNames(List<String>
>>> fieldNames) seems to be a safer choice.
>>> I agree that simplicity is important but making API safer to use is also
>>> important.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <tw...@apache.org> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> thanks for your feedback. I removed `withNamesAndPositions` from the
>>>> public API list and added a comment that this is only internal API for
>>>> converters and serializers.
>>>>
>>>> I would start a new vote tomorrow if there are no objections.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> On 23.09.20 08:55, Jark Wu wrote:
>>>>> Hi Timo,
>>>>>
>>>>> Sorry for the late reply.
>>>>> I think it would be great if we can make `withNamesAndPositions`
>> internal
>>>>> visible. This reduces the complexity of the public API.
>>>>> It's hard to come up with a perfect solution. So let's move on this
>> FLIP.
>>>>> I don't have other concerns.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> the fieldNames map is not intended for users. I would also be fine to
>>>>>> make it a default scope constructor and access it with some internal
>>>>>> utility class next to the Row class. The fieldNames map must only be
>>>>>> used by serializers and converters. A user has no benefit in using it.
>>>>>>
>>>>>> For the creation of new rows (without reusing, which only advanced
>> users
>>>>>> usually do), I don't see a benefit of having:
>>>>>>
>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>>> reuse.setField("myField", 12);
>>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>>
>>>>>> The purpose of Row.withName() is too create a Row easily and readable
>>>>>> without declaring 50+ column names or dealing with indices in this
>>>> range.
>>>>>>
>>>>>> Personally, I would like to make Row an interface and have concrete
>> row
>>>>>> implementations for different purposes but this would break existing
>>>>>> programs too much.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 18.09.20 11:04, Jark Wu wrote:
>>>>>>> Personally I think the fieldNames Map is confusing and not handy.
>>>>>>> I just have an idea but not sure what you think.
>>>>>>> What about adding a new constructor with List field names, this
>> enables
>>>>>> all
>>>>>>> name-based setter/getters.
>>>>>>> Regarding to List -> Map cost for every record, we can suggest users
>> to
>>>>>>> reuse the Row in the task.
>>>>>>>
>>>>>>> new Row(int arity)
>>>>>>> new Row(List<String> fieldNames)
>>>>>>>
>>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>>>> reuse.setField("myField", 12);
>>>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>>>
>>>>>>> My point is that, if we can have a handy constructor for named Row,
>> we
>>>>>> may
>>>>>>> not need to distinguish the named-only or positionAndNamed mode.
>>>>>>> This can avoid (fast-fail) the potential problem when setting an
>>>> invalid
>>>>>>> field.
>>>>>>>
>>>>>>> We can also come up with a new class for the field names which will
>>>>>>> construct the Map and be shared among all Row instances.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org>
>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>>>>>> particular I updated the `Improve dealing with Row in DataStream
>> API`
>>>>>>>> section another time. We introduced static methods for Row that
>> should
>>>>>>>> make the semantics clear to users:
>>>>>>>>
>>>>>>>> // allows to use index-based setters and getters (equivalent to new
>>>>>>>> Row(int))
>>>>>>>> // method exists for completeness
>>>>>>>> public static withPositions(int length);
>>>>>>>>
>>>>>>>> // allows to use name-based setters and getters
>>>>>>>> public static withNames();
>>>>>>>>
>>>>>>>> // allows to use both name-based and position-based setters and
>>>> getters
>>>>>>>> public static withNamesAndPositions(Map<String, Integer>
>> fieldNames);
>>>>>>>>
>>>>>>>> In any case, non of the existing methods will be deprecated and only
>>>>>>>> additional functionality will be available through the methods
>> above.
>>>>>>>>
>>>>>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
>>>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
>> with
>>>>>> the
>>>>>>>>>> outcome. I think the result is one of the last core API
>> refactoring
>>>>>> and
>>>>>>>>>> users will be happy to have a consistent changelog support. Thanks
>>>> for
>>>>>>>>>> all the contributions.
>>>>>>>>>>
>>>>>>>>>> If there are no objections, I would continue with a voting.
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>>>>>> Thanks, i'm fine with that.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
>>>>>>>>>>>
>>>>>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>>>>>
>>>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
>>>> as
>>>>>> a
>>>>>>>>>>>> changelog something.
>>>>>>>>>>>>
>>>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
>> with
>>>>>> Row
>>>>>>>>>>>> otherwise users are confused due to duplicate records with a
>>>> missing
>>>>>>>>>>>> changeflag.
>>>>>>>>>>>>
>>>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
>>>> continue
>>>>>>>> to a
>>>>>>>>>>>> vote.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>>>>>> I think it would bring in much confusion by a different API
>> name
>>>>>> just
>>>>>>>>>>>> because the DataStream generic type is different.
>>>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
>> have a
>>>>>>>> type
>>>>>>>>>>>> check there ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Switch to a new API name does not really solve the problem
>> well,
>>>>>>>> people
>>>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
>> are
>>>>>> some
>>>>>>>>>>>> confusions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> • Should DataStream of Row type always use
>> #fromChangelogStream ?
>>>>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode
>> ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
>> least
>>>>>> for
>>>>>>>> the
>>>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>>>>>> possible.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>>>>>> retractions
>>>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>>>>>> duplicate
>>>>>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
>>>>>> ChangelogMode
>>>>>>>>>>>> they like as an optional param.
>>>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
>>>> ChangelogMode
>>>>>>>> is
>>>>>>>>>>>> INSERT.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Kurt Young <yk...@gmail.com>.
Thanks for the detailed response, 1-5 sounds good to me.

For #6, I just think of another case which would also annoy users. Consider
code like this:

Row row = Row.withNames();
row.setField("a", 1);
row.setField("b", 2);

and for second time, he changes the sequence of setting method calls:

Row row = Row.withNames();
row.setField("b", 2);
row.setField("a", 1);

I don't think anyone would expect these two rows are actually different.

Instead, if we at least define the field names first, which will fix the
order, we would not have such side effects.

Best,
Kurt


On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <tw...@apache.org> wrote:

> Hi Kurt,
>
> thanks for your feedback.
>
> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> One could argue that the API would look similar to a CREATE TABLE
> statement: first schema then connector. I updated the FLIP.
>
> 2. "will we do some verification?"
> Yes, we will definitely do verification. It will happen based on what is
> available in TypeInformation.
>
> "if T is a Tuple, do we have some rules for setting field names in Schema?"
> The rule in this case would be to take the
> TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.
>
> "Will we do some type coercion?"
> For `fromDataStream()`, type coercion between an explicitly specified
> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> desired data type explicitly and expects correctness.
> For `toDataStream()`, it has similar type coercion semantics as a
> regular table sink (first on a logical level, then on a class level).
>
> It is difficult to list all type rules upfront, but it should behave
> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> discussion about other type handling to the individual PRs. The general
> goal should be to stay backwards compatible but reduce manual schema work.
>
> 3. "How do you derive schema from DataStream<Row>"
>
> We use RowTypeInfo (if DataStream comes from DataStream API) or
> ExternalTypeInfo (if DataStream comes from Table API).
>
> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
> method is necessary"
> Dealing with Row in DataStream API is very inconvenient. With the new
> data format converters, the behavior would be consistent accross
> DataStream API and Table functions. The logic is already present and
> seems to be pretty stable so far. We would break a lot of existing code
> if we get rid of this method.
>
> 5. "How does Row behave like GenericRowData?"
>
> Row can contain StringData or further nested RowData. The data format
> converters support that. The conversion of fields would be a no-op in
> this case. In the end, both Row and GenericRowData just stored an Object[].
>
> 6. "They would expect that all the fields they didn't set should be NULL."
>
> But this will be the case. The full list of all field names and their
> order is defined by the data type, not the Row instance. During
> serialization/conversion we can reorder fields, throw exceptions about
> unknown field names, and set remaining fields to NULL.
>
> If a user uses `new Row(5)` but the serializer is configured by a data
> type that only supports `Row(3)`, it will also throw an exception during
> runtime. We cannot guard users from creating invalid rows. But the
> strongly typed serializers/converters will do the final verification.
>
> Regards,
> Timo
>
>
> On 23.09.20 12:08, Kurt Young wrote:
> > Sorry for being late, I went through the design doc and here are
> > my comments:
> >
> > 1. A minor one, how about moving Schema after DataStream in all affected
> > APIs? Such as:
> > StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> > StreamTableEnvironment.createTemporaryView(String, Schema,
> DataStream<T>):
> > Unit
> > StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
> Table
> > StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
> >
> > It will look more aligned with APIs which don't have Schema. For example:
> > StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> > StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
> >
> > 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> > DataStream<T>): Table
> > How do we convert the types between Schema and T, will we do some
> > verification? Will we do some type coercion? For example,
> > can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
> > do we have some rules for setting field names in Schema?
> > I can see lots of imagination from this method but the rules are unclear
> to
> > me.
> >
> > 3. A question to:
> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> > Table
> > How do you derive schema from DataStream<Row>?
> >
> > 4. A question to:
> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> > Table): DataStream<T>
> > I'm wondering whether this method is necessary. Always getting a
> > DataStream<Row> from the table and then manually applying some
> > map function seems to be not cumbersome and safer (such intelligent
> > conversion always seems error prone to me).
> >
> > 5.
> >> The `toChangelogStream(Schema, Table)` exists for completeness to have a
> > symmetric API.
> >> It allows for declaring the data type for output similar to
> > DynamicTableSinks.
> >> Additionally, internal structures such as StringData, TimestampData can
> > still be used by power users.
> >> In that sense, Row can behave like a GenericRowData.
> >
> > How does Row behave like GenericRowData? I don't think Row can work with
> > RowData for now.
> >
> > 6. Row.withNames() seems dangerous to me. It relies on user setting all
> the
> > fields they need during `setField(String name, T value)`.
> > It's also highly possible that users would not set certain fields when
> for
> > example some fields are NULL. They would expect that all the fields
> > they didn't set should be NULL.
> > Row.withNames(String[] filedNames) or Row.withNames(List<String>
> > fieldNames) seems to be a safer choice.
> > I agree that simplicity is important but making API safer to use is also
> > important.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <tw...@apache.org> wrote:
> >
> >> Hi Jark,
> >>
> >> thanks for your feedback. I removed `withNamesAndPositions` from the
> >> public API list and added a comment that this is only internal API for
> >> converters and serializers.
> >>
> >> I would start a new vote tomorrow if there are no objections.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >> On 23.09.20 08:55, Jark Wu wrote:
> >>> Hi Timo,
> >>>
> >>> Sorry for the late reply.
> >>> I think it would be great if we can make `withNamesAndPositions`
> internal
> >>> visible. This reduces the complexity of the public API.
> >>> It's hard to come up with a perfect solution. So let's move on this
> FLIP.
> >>> I don't have other concerns.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:
> >>>
> >>>> Hi Jark,
> >>>>
> >>>> the fieldNames map is not intended for users. I would also be fine to
> >>>> make it a default scope constructor and access it with some internal
> >>>> utility class next to the Row class. The fieldNames map must only be
> >>>> used by serializers and converters. A user has no benefit in using it.
> >>>>
> >>>> For the creation of new rows (without reusing, which only advanced
> users
> >>>> usually do), I don't see a benefit of having:
> >>>>
> >>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>> reuse.setField("myField", 12);
> >>>> reuse.setField("myOtherField", "This is a test");
> >>>>
> >>>> The purpose of Row.withName() is too create a Row easily and readable
> >>>> without declaring 50+ column names or dealing with indices in this
> >> range.
> >>>>
> >>>> Personally, I would like to make Row an interface and have concrete
> row
> >>>> implementations for different purposes but this would break existing
> >>>> programs too much.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 18.09.20 11:04, Jark Wu wrote:
> >>>>> Personally I think the fieldNames Map is confusing and not handy.
> >>>>> I just have an idea but not sure what you think.
> >>>>> What about adding a new constructor with List field names, this
> enables
> >>>> all
> >>>>> name-based setter/getters.
> >>>>> Regarding to List -> Map cost for every record, we can suggest users
> to
> >>>>> reuse the Row in the task.
> >>>>>
> >>>>> new Row(int arity)
> >>>>> new Row(List<String> fieldNames)
> >>>>>
> >>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>> reuse.setField("myField", 12);
> >>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>
> >>>>> My point is that, if we can have a handy constructor for named Row,
> we
> >>>> may
> >>>>> not need to distinguish the named-only or positionAndNamed mode.
> >>>>> This can avoid (fast-fail) the potential problem when setting an
> >> invalid
> >>>>> field.
> >>>>>
> >>>>> We can also come up with a new class for the field names which will
> >>>>> construct the Map and be shared among all Row instances.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org>
> wrote:
> >>>>>
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
> >>>>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>>>> particular I updated the `Improve dealing with Row in DataStream
> API`
> >>>>>> section another time. We introduced static methods for Row that
> should
> >>>>>> make the semantics clear to users:
> >>>>>>
> >>>>>> // allows to use index-based setters and getters (equivalent to new
> >>>>>> Row(int))
> >>>>>> // method exists for completeness
> >>>>>> public static withPositions(int length);
> >>>>>>
> >>>>>> // allows to use name-based setters and getters
> >>>>>> public static withNames();
> >>>>>>
> >>>>>> // allows to use both name-based and position-based setters and
> >> getters
> >>>>>> public static withNamesAndPositions(Map<String, Integer>
> fieldNames);
> >>>>>>
> >>>>>> In any case, non of the existing methods will be deprecated and only
> >>>>>> additional functionality will be available through the methods
> above.
> >>>>>>
> >>>>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> >>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
> with
> >>>> the
> >>>>>>>> outcome. I think the result is one of the last core API
> refactoring
> >>>> and
> >>>>>>>> users will be happy to have a consistent changelog support. Thanks
> >> for
> >>>>>>>> all the contributions.
> >>>>>>>>
> >>>>>>>> If there are no objections, I would continue with a voting.
> >>>>>>>>
> >>>>>>>> What do you think?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>>>> Thanks, i'm fine with that.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>>>
> >>>>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>>>
> >>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
> >> as
> >>>> a
> >>>>>>>>>> changelog something.
> >>>>>>>>>>
> >>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
> with
> >>>> Row
> >>>>>>>>>> otherwise users are confused due to duplicate records with a
> >> missing
> >>>>>>>>>> changeflag.
> >>>>>>>>>>
> >>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> >> continue
> >>>>>> to a
> >>>>>>>>>> vote.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>>>> I think it would bring in much confusion by a different API
> name
> >>>> just
> >>>>>>>>>> because the DataStream generic type is different.
> >>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
> have a
> >>>>>> type
> >>>>>>>>>> check there ?
> >>>>>>>>>>>
> >>>>>>>>>>> Switch to a new API name does not really solve the problem
> well,
> >>>>>> people
> >>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
> are
> >>>> some
> >>>>>>>>>> confusions:
> >>>>>>>>>>>
> >>>>>>>>>>> • Should DataStream of Row type always use
> #fromChangelogStream ?
> >>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode
> ?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Danny Chan
> >>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> >>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
> least
> >>>> for
> >>>>>> the
> >>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>>>
> >>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>>>> possible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >>>> retractions
> >>>>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>>>> duplicate
> >>>>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> solve your concerns ? People can pass around whatever
> >>>> ChangelogMode
> >>>>>>>>>> they like as an optional param.
> >>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> >> ChangelogMode
> >>>>>> is
> >>>>>>>>>> INSERT.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Kurt,

thanks for your feedback.

1. "moving Schema after DataStream": I don't have a strong opinion here. 
One could argue that the API would look similar to a CREATE TABLE 
statement: first schema then connector. I updated the FLIP.

2. "will we do some verification?"
Yes, we will definitely do verification. It will happen based on what is 
available in TypeInformation.

"if T is a Tuple, do we have some rules for setting field names in Schema?"
The rule in this case would be to take the 
TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.

"Will we do some type coercion?"
For `fromDataStream()`, type coercion between an explicitly specified 
Schema and DataStream will not happen (e.g. DataStream<Integer> != 
Schema.column("f", DataTypes.BIGINT())). Because the user specified the 
desired data type explicitly and expects correctness.
For `toDataStream()`, it has similar type coercion semantics as a 
regular table sink (first on a logical level, then on a class level).

It is difficult to list all type rules upfront, but it should behave 
similar to all the work done in FLIP-65 and FLIP-95. I would move the 
discussion about other type handling to the individual PRs. The general 
goal should be to stay backwards compatible but reduce manual schema work.

3. "How do you derive schema from DataStream<Row>"

We use RowTypeInfo (if DataStream comes from DataStream API) or 
ExternalTypeInfo (if DataStream comes from Table API).

4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this 
method is necessary"
Dealing with Row in DataStream API is very inconvenient. With the new 
data format converters, the behavior would be consistent accross 
DataStream API and Table functions. The logic is already present and 
seems to be pretty stable so far. We would break a lot of existing code 
if we get rid of this method.

5. "How does Row behave like GenericRowData?"

Row can contain StringData or further nested RowData. The data format 
converters support that. The conversion of fields would be a no-op in 
this case. In the end, both Row and GenericRowData just stored an Object[].

6. "They would expect that all the fields they didn't set should be NULL."

But this will be the case. The full list of all field names and their 
order is defined by the data type, not the Row instance. During 
serialization/conversion we can reorder fields, throw exceptions about 
unknown field names, and set remaining fields to NULL.

If a user uses `new Row(5)` but the serializer is configured by a data 
type that only supports `Row(3)`, it will also throw an exception during 
runtime. We cannot guard users from creating invalid rows. But the 
strongly typed serializers/converters will do the final verification.

Regards,
Timo


On 23.09.20 12:08, Kurt Young wrote:
> Sorry for being late, I went through the design doc and here are
> my comments:
> 
> 1. A minor one, how about moving Schema after DataStream in all affected
> APIs? Such as:
> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> StreamTableEnvironment.createTemporaryView(String, Schema, DataStream<T>):
> Unit
> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>): Table
> StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
> 
> It will look more aligned with APIs which don't have Schema. For example:
> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
> 
> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> DataStream<T>): Table
> How do we convert the types between Schema and T, will we do some
> verification? Will we do some type coercion? For example,
> can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
> do we have some rules for setting field names in Schema?
> I can see lots of imagination from this method but the rules are unclear to
> me.
> 
> 3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> Table
> How do you derive schema from DataStream<Row>?
> 
> 4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> Table): DataStream<T>
> I'm wondering whether this method is necessary. Always getting a
> DataStream<Row> from the table and then manually applying some
> map function seems to be not cumbersome and safer (such intelligent
> conversion always seems error prone to me).
> 
> 5.
>> The `toChangelogStream(Schema, Table)` exists for completeness to have a
> symmetric API.
>> It allows for declaring the data type for output similar to
> DynamicTableSinks.
>> Additionally, internal structures such as StringData, TimestampData can
> still be used by power users.
>> In that sense, Row can behave like a GenericRowData.
> 
> How does Row behave like GenericRowData? I don't think Row can work with
> RowData for now.
> 
> 6. Row.withNames() seems dangerous to me. It relies on user setting all the
> fields they need during `setField(String name, T value)`.
> It's also highly possible that users would not set certain fields when for
> example some fields are NULL. They would expect that all the fields
> they didn't set should be NULL.
> Row.withNames(String[] filedNames) or Row.withNames(List<String>
> fieldNames) seems to be a safer choice.
> I agree that simplicity is important but making API safer to use is also
> important.
> 
> Best,
> Kurt
> 
> 
> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <tw...@apache.org> wrote:
> 
>> Hi Jark,
>>
>> thanks for your feedback. I removed `withNamesAndPositions` from the
>> public API list and added a comment that this is only internal API for
>> converters and serializers.
>>
>> I would start a new vote tomorrow if there are no objections.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>> On 23.09.20 08:55, Jark Wu wrote:
>>> Hi Timo,
>>>
>>> Sorry for the late reply.
>>> I think it would be great if we can make `withNamesAndPositions` internal
>>> visible. This reduces the complexity of the public API.
>>> It's hard to come up with a perfect solution. So let's move on this FLIP.
>>> I don't have other concerns.
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> the fieldNames map is not intended for users. I would also be fine to
>>>> make it a default scope constructor and access it with some internal
>>>> utility class next to the Row class. The fieldNames map must only be
>>>> used by serializers and converters. A user has no benefit in using it.
>>>>
>>>> For the creation of new rows (without reusing, which only advanced users
>>>> usually do), I don't see a benefit of having:
>>>>
>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>> reuse.setField("myField", 12);
>>>> reuse.setField("myOtherField", "This is a test");
>>>>
>>>> The purpose of Row.withName() is too create a Row easily and readable
>>>> without declaring 50+ column names or dealing with indices in this
>> range.
>>>>
>>>> Personally, I would like to make Row an interface and have concrete row
>>>> implementations for different purposes but this would break existing
>>>> programs too much.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 18.09.20 11:04, Jark Wu wrote:
>>>>> Personally I think the fieldNames Map is confusing and not handy.
>>>>> I just have an idea but not sure what you think.
>>>>> What about adding a new constructor with List field names, this enables
>>>> all
>>>>> name-based setter/getters.
>>>>> Regarding to List -> Map cost for every record, we can suggest users to
>>>>> reuse the Row in the task.
>>>>>
>>>>> new Row(int arity)
>>>>> new Row(List<String> fieldNames)
>>>>>
>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>> reuse.setField("myField", 12);
>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>
>>>>> My point is that, if we can have a handy constructor for named Row, we
>>>> may
>>>>> not need to distinguish the named-only or positionAndNamed mode.
>>>>> This can avoid (fast-fail) the potential problem when setting an
>> invalid
>>>>> field.
>>>>>
>>>>> We can also come up with a new class for the field names which will
>>>>> construct the Map and be shared among all Row instances.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>>>> particular I updated the `Improve dealing with Row in DataStream API`
>>>>>> section another time. We introduced static methods for Row that should
>>>>>> make the semantics clear to users:
>>>>>>
>>>>>> // allows to use index-based setters and getters (equivalent to new
>>>>>> Row(int))
>>>>>> // method exists for completeness
>>>>>> public static withPositions(int length);
>>>>>>
>>>>>> // allows to use name-based setters and getters
>>>>>> public static withNames();
>>>>>>
>>>>>> // allows to use both name-based and position-based setters and
>> getters
>>>>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>>>>>
>>>>>> In any case, non of the existing methods will be deprecated and only
>>>>>> additional functionality will be available through the methods above.
>>>>>>
>>>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
>>>> the
>>>>>>>> outcome. I think the result is one of the last core API refactoring
>>>> and
>>>>>>>> users will be happy to have a consistent changelog support. Thanks
>> for
>>>>>>>> all the contributions.
>>>>>>>>
>>>>>>>> If there are no objections, I would continue with a voting.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>>>> Thanks, i'm fine with that.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
>>>>>>>>>
>>>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>>>
>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
>> as
>>>> a
>>>>>>>>>> changelog something.
>>>>>>>>>>
>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
>>>> Row
>>>>>>>>>> otherwise users are confused due to duplicate records with a
>> missing
>>>>>>>>>> changeflag.
>>>>>>>>>>
>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
>> continue
>>>>>> to a
>>>>>>>>>> vote.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>>>> I think it would bring in much confusion by a different API name
>>>> just
>>>>>>>>>> because the DataStream generic type is different.
>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>>>>>> type
>>>>>>>>>> check there ?
>>>>>>>>>>>
>>>>>>>>>>> Switch to a new API name does not really solve the problem well,
>>>>>> people
>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there are
>>>> some
>>>>>>>>>> confusions:
>>>>>>>>>>>
>>>>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Danny Chan
>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at least
>>>> for
>>>>>> the
>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>>>
>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>>>> possible.
>>>>>>>>>>>>
>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>>>
>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>>>> retractions
>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>>>> duplicate
>>>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>>>
>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
>>>> ChangelogMode
>>>>>>>>>> they like as an optional param.
>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
>> ChangelogMode
>>>>>> is
>>>>>>>>>> INSERT.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Kurt Young <yk...@gmail.com>.
Sorry for being late, I went through the design doc and here are
my comments:

1. A minor one, how about moving Schema after DataStream in all affected
APIs? Such as:
StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
StreamTableEnvironment.createTemporaryView(String, Schema, DataStream<T>):
Unit
StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>): Table
StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>

It will look more aligned with APIs which don't have Schema. For example:
StreamTableEnvironment.fromDataStream(DataStream<T>): Table
StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table

2. A question to: StreamTableEnvironment.fromDataStream(Schema,
DataStream<T>): Table
How do we convert the types between Schema and T, will we do some
verification? Will we do some type coercion? For example,
can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
do we have some rules for setting field names in Schema?
I can see lots of imagination from this method but the rules are unclear to
me.

3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
Table
How do you derive schema from DataStream<Row>?

4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType<?>,
Table): DataStream<T>
I'm wondering whether this method is necessary. Always getting a
DataStream<Row> from the table and then manually applying some
map function seems to be not cumbersome and safer (such intelligent
conversion always seems error prone to me).

5.
> The `toChangelogStream(Schema, Table)` exists for completeness to have a
symmetric API.
> It allows for declaring the data type for output similar to
DynamicTableSinks.
> Additionally, internal structures such as StringData, TimestampData can
still be used by power users.
> In that sense, Row can behave like a GenericRowData.

How does Row behave like GenericRowData? I don't think Row can work with
RowData for now.

6. Row.withNames() seems dangerous to me. It relies on user setting all the
fields they need during `setField(String name, T value)`.
It's also highly possible that users would not set certain fields when for
example some fields are NULL. They would expect that all the fields
they didn't set should be NULL.
Row.withNames(String[] filedNames) or Row.withNames(List<String>
fieldNames) seems to be a safer choice.
I agree that simplicity is important but making API safer to use is also
important.

Best,
Kurt


On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <tw...@apache.org> wrote:

> Hi Jark,
>
> thanks for your feedback. I removed `withNamesAndPositions` from the
> public API list and added a comment that this is only internal API for
> converters and serializers.
>
> I would start a new vote tomorrow if there are no objections.
>
> What do you think?
>
> Regards,
> Timo
>
> On 23.09.20 08:55, Jark Wu wrote:
> > Hi Timo,
> >
> > Sorry for the late reply.
> > I think it would be great if we can make `withNamesAndPositions` internal
> > visible. This reduces the complexity of the public API.
> > It's hard to come up with a perfect solution. So let's move on this FLIP.
> > I don't have other concerns.
> >
> > Best,
> > Jark
> >
> > On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:
> >
> >> Hi Jark,
> >>
> >> the fieldNames map is not intended for users. I would also be fine to
> >> make it a default scope constructor and access it with some internal
> >> utility class next to the Row class. The fieldNames map must only be
> >> used by serializers and converters. A user has no benefit in using it.
> >>
> >> For the creation of new rows (without reusing, which only advanced users
> >> usually do), I don't see a benefit of having:
> >>
> >> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >> reuse.setField("myField", 12);
> >> reuse.setField("myOtherField", "This is a test");
> >>
> >> The purpose of Row.withName() is too create a Row easily and readable
> >> without declaring 50+ column names or dealing with indices in this
> range.
> >>
> >> Personally, I would like to make Row an interface and have concrete row
> >> implementations for different purposes but this would break existing
> >> programs too much.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 18.09.20 11:04, Jark Wu wrote:
> >>> Personally I think the fieldNames Map is confusing and not handy.
> >>> I just have an idea but not sure what you think.
> >>> What about adding a new constructor with List field names, this enables
> >> all
> >>> name-based setter/getters.
> >>> Regarding to List -> Map cost for every record, we can suggest users to
> >>> reuse the Row in the task.
> >>>
> >>> new Row(int arity)
> >>> new Row(List<String> fieldNames)
> >>>
> >>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>> reuse.setField("myField", 12);
> >>> reuse.setField("myOtherField", "This is a test");
> >>>
> >>> My point is that, if we can have a handy constructor for named Row, we
> >> may
> >>> not need to distinguish the named-only or positionAndNamed mode.
> >>> This can avoid (fast-fail) the potential problem when setting an
> invalid
> >>> field.
> >>>
> >>> We can also come up with a new class for the field names which will
> >>> construct the Map and be shared among all Row instances.
> >>>
> >>> What do you think?
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> thanks for all the feedback. I updated the FLIP again on Thursday to
> >>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>> particular I updated the `Improve dealing with Row in DataStream API`
> >>>> section another time. We introduced static methods for Row that should
> >>>> make the semantics clear to users:
> >>>>
> >>>> // allows to use index-based setters and getters (equivalent to new
> >>>> Row(int))
> >>>> // method exists for completeness
> >>>> public static withPositions(int length);
> >>>>
> >>>> // allows to use name-based setters and getters
> >>>> public static withNames();
> >>>>
> >>>> // allows to use both name-based and position-based setters and
> getters
> >>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
> >>>>
> >>>> In any case, non of the existing methods will be deprecated and only
> >>>> additional functionality will be available through the methods above.
> >>>>
> >>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>
> >>>>> Best,
> >>>>> Danny Chan
> >>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> >>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
> >> the
> >>>>>> outcome. I think the result is one of the last core API refactoring
> >> and
> >>>>>> users will be happy to have a consistent changelog support. Thanks
> for
> >>>>>> all the contributions.
> >>>>>>
> >>>>>> If there are no objections, I would continue with a voting.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>> Thanks, i'm fine with that.
> >>>>>>>
> >>>>>>>
> >>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>
> >>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>
> >>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
> as
> >> a
> >>>>>>>> changelog something.
> >>>>>>>>
> >>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
> >> Row
> >>>>>>>> otherwise users are confused due to duplicate records with a
> missing
> >>>>>>>> changeflag.
> >>>>>>>>
> >>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> continue
> >>>> to a
> >>>>>>>> vote.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>> I think it would bring in much confusion by a different API name
> >> just
> >>>>>>>> because the DataStream generic type is different.
> >>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
> >>>> type
> >>>>>>>> check there ?
> >>>>>>>>>
> >>>>>>>>> Switch to a new API name does not really solve the problem well,
> >>>> people
> >>>>>>>> still need to declare the ChangelogMode explicitly, and there are
> >> some
> >>>>>>>> confusions:
> >>>>>>>>>
> >>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> >>>>>>>>>> I had this in the inital design, but Jark had concerns at least
> >> for
> >>>> the
> >>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>
> >>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>> possible.
> >>>>>>>>>>
> >>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>
> >>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >> retractions
> >>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>> duplicate
> >>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>
> >>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>
> >>>>>>>>>>> solve your concerns ? People can pass around whatever
> >> ChangelogMode
> >>>>>>>> they like as an optional param.
> >>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> ChangelogMode
> >>>> is
> >>>>>>>> INSERT.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Danny Chan
> >>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Jark,

thanks for your feedback. I removed `withNamesAndPositions` from the 
public API list and added a comment that this is only internal API for 
converters and serializers.

I would start a new vote tomorrow if there are no objections.

What do you think?

Regards,
Timo

On 23.09.20 08:55, Jark Wu wrote:
> Hi Timo,
> 
> Sorry for the late reply.
> I think it would be great if we can make `withNamesAndPositions` internal
> visible. This reduces the complexity of the public API.
> It's hard to come up with a perfect solution. So let's move on this FLIP.
> I don't have other concerns.
> 
> Best,
> Jark
> 
> On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:
> 
>> Hi Jark,
>>
>> the fieldNames map is not intended for users. I would also be fine to
>> make it a default scope constructor and access it with some internal
>> utility class next to the Row class. The fieldNames map must only be
>> used by serializers and converters. A user has no benefit in using it.
>>
>> For the creation of new rows (without reusing, which only advanced users
>> usually do), I don't see a benefit of having:
>>
>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>> reuse.setField("myField", 12);
>> reuse.setField("myOtherField", "This is a test");
>>
>> The purpose of Row.withName() is too create a Row easily and readable
>> without declaring 50+ column names or dealing with indices in this range.
>>
>> Personally, I would like to make Row an interface and have concrete row
>> implementations for different purposes but this would break existing
>> programs too much.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>>
>> On 18.09.20 11:04, Jark Wu wrote:
>>> Personally I think the fieldNames Map is confusing and not handy.
>>> I just have an idea but not sure what you think.
>>> What about adding a new constructor with List field names, this enables
>> all
>>> name-based setter/getters.
>>> Regarding to List -> Map cost for every record, we can suggest users to
>>> reuse the Row in the task.
>>>
>>> new Row(int arity)
>>> new Row(List<String> fieldNames)
>>>
>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>> reuse.setField("myField", 12);
>>> reuse.setField("myOtherField", "This is a test");
>>>
>>> My point is that, if we can have a handy constructor for named Row, we
>> may
>>> not need to distinguish the named-only or positionAndNamed mode.
>>> This can avoid (fast-fail) the potential problem when setting an invalid
>>> field.
>>>
>>> We can also come up with a new class for the field names which will
>>> construct the Map and be shared among all Row instances.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>> particular I updated the `Improve dealing with Row in DataStream API`
>>>> section another time. We introduced static methods for Row that should
>>>> make the semantics clear to users:
>>>>
>>>> // allows to use index-based setters and getters (equivalent to new
>>>> Row(int))
>>>> // method exists for completeness
>>>> public static withPositions(int length);
>>>>
>>>> // allows to use name-based setters and getters
>>>> public static withNames();
>>>>
>>>> // allows to use both name-based and position-based setters and getters
>>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>>>
>>>> In any case, non of the existing methods will be deprecated and only
>>>> additional functionality will be available through the methods above.
>>>>
>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
>> the
>>>>>> outcome. I think the result is one of the last core API refactoring
>> and
>>>>>> users will be happy to have a consistent changelog support. Thanks for
>>>>>> all the contributions.
>>>>>>
>>>>>> If there are no objections, I would continue with a voting.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>> Thanks, i'm fine with that.
>>>>>>>
>>>>>>>
>>>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
>>>>>>>
>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>
>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it as
>> a
>>>>>>>> changelog something.
>>>>>>>>
>>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
>> Row
>>>>>>>> otherwise users are confused due to duplicate records with a missing
>>>>>>>> changeflag.
>>>>>>>>
>>>>>>>> I will update the FLIP-136 a last time. I hope we can then continue
>>>> to a
>>>>>>>> vote.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>> I think it would bring in much confusion by a different API name
>> just
>>>>>>>> because the DataStream generic type is different.
>>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>>>> type
>>>>>>>> check there ?
>>>>>>>>>
>>>>>>>>> Switch to a new API name does not really solve the problem well,
>>>> people
>>>>>>>> still need to declare the ChangelogMode explicitly, and there are
>> some
>>>>>>>> confusions:
>>>>>>>>>
>>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>>>>>>>> I had this in the inital design, but Jark had concerns at least
>> for
>>>> the
>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>
>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>> possible.
>>>>>>>>>>
>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>
>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>> retractions
>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>> duplicate
>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>
>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>
>>>>>>>>>>> solve your concerns ? People can pass around whatever
>> ChangelogMode
>>>>>>>> they like as an optional param.
>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
>>>> is
>>>>>>>> INSERT.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Danny Chan
>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>>>>>>>
>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

Sorry for the late reply.
I think it would be great if we can make `withNamesAndPositions` internal
visible. This reduces the complexity of the public API.
It's hard to come up with a perfect solution. So let's move on this FLIP.
I don't have other concerns.

Best,
Jark

On Fri, 18 Sep 2020 at 22:14, Timo Walther <tw...@apache.org> wrote:

> Hi Jark,
>
> the fieldNames map is not intended for users. I would also be fine to
> make it a default scope constructor and access it with some internal
> utility class next to the Row class. The fieldNames map must only be
> used by serializers and converters. A user has no benefit in using it.
>
> For the creation of new rows (without reusing, which only advanced users
> usually do), I don't see a benefit of having:
>
> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> reuse.setField("myField", 12);
> reuse.setField("myOtherField", "This is a test");
>
> The purpose of Row.withName() is too create a Row easily and readable
> without declaring 50+ column names or dealing with indices in this range.
>
> Personally, I would like to make Row an interface and have concrete row
> implementations for different purposes but this would break existing
> programs too much.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 18.09.20 11:04, Jark Wu wrote:
> > Personally I think the fieldNames Map is confusing and not handy.
> > I just have an idea but not sure what you think.
> > What about adding a new constructor with List field names, this enables
> all
> > name-based setter/getters.
> > Regarding to List -> Map cost for every record, we can suggest users to
> > reuse the Row in the task.
> >
> > new Row(int arity)
> > new Row(List<String> fieldNames)
> >
> > final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> > reuse.setField("myField", 12);
> > reuse.setField("myOtherField", "This is a test");
> >
> > My point is that, if we can have a handy constructor for named Row, we
> may
> > not need to distinguish the named-only or positionAndNamed mode.
> > This can avoid (fast-fail) the potential problem when setting an invalid
> > field.
> >
> > We can also come up with a new class for the field names which will
> > construct the Map and be shared among all Row instances.
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> > On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:
> >
> >> Hi everyone,
> >>
> >> thanks for all the feedback. I updated the FLIP again on Thursday to
> >> integrate the feedback I got from Jingsong and Jark offline. In
> >> particular I updated the `Improve dealing with Row in DataStream API`
> >> section another time. We introduced static methods for Row that should
> >> make the semantics clear to users:
> >>
> >> // allows to use index-based setters and getters (equivalent to new
> >> Row(int))
> >> // method exists for completeness
> >> public static withPositions(int length);
> >>
> >> // allows to use name-based setters and getters
> >> public static withNames();
> >>
> >> // allows to use both name-based and position-based setters and getters
> >> public static withNamesAndPositions(Map<String, Integer> fieldNames);
> >>
> >> In any case, non of the existing methods will be deprecated and only
> >> additional functionality will be available through the methods above.
> >>
> >> I started a voting thread on Friday. Please feel free to vote.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 10.09.20 10:21, Danny Chan wrote:
> >>> Thanks for driving this Timo, +1 for voting ~
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> >>>> Thanks everyone for this healthy discussion. I updated the FLIP with
> the
> >>>> outcome. I think the result is one of the last core API refactoring
> and
> >>>> users will be happy to have a consistent changelog support. Thanks for
> >>>> all the contributions.
> >>>>
> >>>> If there are no objections, I would continue with a voting.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>> Thanks, i'm fine with that.
> >>>>>
> >>>>>
> >>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >>>>>
> >>>>>> I agree with Jark. It reduces confusion.
> >>>>>>
> >>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>> `fromChangelogStream`. But only the latter API will interpret it as
> a
> >>>>>> changelog something.
> >>>>>>
> >>>>>> And as I mentioned before, the `toChangelogStream` must work with
> Row
> >>>>>> otherwise users are confused due to duplicate records with a missing
> >>>>>> changeflag.
> >>>>>>
> >>>>>> I will update the FLIP-136 a last time. I hope we can then continue
> >> to a
> >>>>>> vote.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>> I think it would bring in much confusion by a different API name
> just
> >>>>>> because the DataStream generic type is different.
> >>>>>>> If there are ChangelogMode that only works for Row, can we have a
> >> type
> >>>>>> check there ?
> >>>>>>>
> >>>>>>> Switch to a new API name does not really solve the problem well,
> >> people
> >>>>>> still need to declare the ChangelogMode explicitly, and there are
> some
> >>>>>> confusions:
> >>>>>>>
> >>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> >>>>>>>> I had this in the inital design, but Jark had concerns at least
> for
> >> the
> >>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>
> >>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >> possible.
> >>>>>>>>
> >>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>
> >>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> retractions
> >>>>>>>> cannot be represented for non-Rows and users will experience
> >> duplicate
> >>>>>>>> records with a missing changeflag.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>> “But I think the planner needs to
> >>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>
> >>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>
> >>>>>>>>> solve your concerns ? People can pass around whatever
> ChangelogMode
> >>>>>> they like as an optional param.
> >>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
> >> is
> >>>>>> INSERT.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>>>>>>>
> >>>>>>>>>> But I think the planner needs to
> >>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Jark,

the fieldNames map is not intended for users. I would also be fine to 
make it a default scope constructor and access it with some internal 
utility class next to the Row class. The fieldNames map must only be 
used by serializers and converters. A user has no benefit in using it.

For the creation of new rows (without reusing, which only advanced users 
usually do), I don't see a benefit of having:

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

The purpose of Row.withName() is too create a Row easily and readable 
without declaring 50+ column names or dealing with indices in this range.

Personally, I would like to make Row an interface and have concrete row 
implementations for different purposes but this would break existing 
programs too much.

What do you think?

Regards,
Timo


On 18.09.20 11:04, Jark Wu wrote:
> Personally I think the fieldNames Map is confusing and not handy.
> I just have an idea but not sure what you think.
> What about adding a new constructor with List field names, this enables all
> name-based setter/getters.
> Regarding to List -> Map cost for every record, we can suggest users to
> reuse the Row in the task.
> 
> new Row(int arity)
> new Row(List<String> fieldNames)
> 
> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> reuse.setField("myField", 12);
> reuse.setField("myOtherField", "This is a test");
> 
> My point is that, if we can have a handy constructor for named Row, we may
> not need to distinguish the named-only or positionAndNamed mode.
> This can avoid (fast-fail) the potential problem when setting an invalid
> field.
> 
> We can also come up with a new class for the field names which will
> construct the Map and be shared among all Row instances.
> 
> What do you think?
> 
> Best,
> Jark
> 
> On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:
> 
>> Hi everyone,
>>
>> thanks for all the feedback. I updated the FLIP again on Thursday to
>> integrate the feedback I got from Jingsong and Jark offline. In
>> particular I updated the `Improve dealing with Row in DataStream API`
>> section another time. We introduced static methods for Row that should
>> make the semantics clear to users:
>>
>> // allows to use index-based setters and getters (equivalent to new
>> Row(int))
>> // method exists for completeness
>> public static withPositions(int length);
>>
>> // allows to use name-based setters and getters
>> public static withNames();
>>
>> // allows to use both name-based and position-based setters and getters
>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>
>> In any case, non of the existing methods will be deprecated and only
>> additional functionality will be available through the methods above.
>>
>> I started a voting thread on Friday. Please feel free to vote.
>>
>> Regards,
>> Timo
>>
>> On 10.09.20 10:21, Danny Chan wrote:
>>> Thanks for driving this Timo, +1 for voting ~
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>>>> outcome. I think the result is one of the last core API refactoring and
>>>> users will be happy to have a consistent changelog support. Thanks for
>>>> all the contributions.
>>>>
>>>> If there are no objections, I would continue with a voting.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>> Thanks, i'm fine with that.
>>>>>
>>>>>
>>>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
>>>>>
>>>>>> I agree with Jark. It reduces confusion.
>>>>>>
>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>> `fromChangelogStream`. But only the latter API will interpret it as a
>>>>>> changelog something.
>>>>>>
>>>>>> And as I mentioned before, the `toChangelogStream` must work with Row
>>>>>> otherwise users are confused due to duplicate records with a missing
>>>>>> changeflag.
>>>>>>
>>>>>> I will update the FLIP-136 a last time. I hope we can then continue
>> to a
>>>>>> vote.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>> I think it would bring in much confusion by a different API name just
>>>>>> because the DataStream generic type is different.
>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>> type
>>>>>> check there ?
>>>>>>>
>>>>>>> Switch to a new API name does not really solve the problem well,
>> people
>>>>>> still need to declare the ChangelogMode explicitly, and there are some
>>>>>> confusions:
>>>>>>>
>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>>>>>> I had this in the inital design, but Jark had concerns at least for
>> the
>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>
>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>> possible.
>>>>>>>>
>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>
>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>>>>>> cannot be represented for non-Rows and users will experience
>> duplicate
>>>>>>>> records with a missing changeflag.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>> “But I think the planner needs to
>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>
>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>
>>>>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
>>>>>> they like as an optional param.
>>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
>> is
>>>>>> INSERT.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>>>>>
>>>>>>>>>> But I think the planner needs to
>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Personally I think the fieldNames Map is confusing and not handy.
I just have an idea but not sure what you think.
What about adding a new constructor with List field names, this enables all
name-based setter/getters.
Regarding to List -> Map cost for every record, we can suggest users to
reuse the Row in the task.

new Row(int arity)
new Row(List<String> fieldNames)

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

My point is that, if we can have a handy constructor for named Row, we may
not need to distinguish the named-only or positionAndNamed mode.
This can avoid (fast-fail) the potential problem when setting an invalid
field.

We can also come up with a new class for the field names which will
construct the Map and be shared among all Row instances.

What do you think?

Best,
Jark

On Thu, 17 Sep 2020 at 16:48, Timo Walther <tw...@apache.org> wrote:

> Hi everyone,
>
> thanks for all the feedback. I updated the FLIP again on Thursday to
> integrate the feedback I got from Jingsong and Jark offline. In
> particular I updated the `Improve dealing with Row in DataStream API`
> section another time. We introduced static methods for Row that should
> make the semantics clear to users:
>
> // allows to use index-based setters and getters (equivalent to new
> Row(int))
> // method exists for completeness
> public static withPositions(int length);
>
> // allows to use name-based setters and getters
> public static withNames();
>
> // allows to use both name-based and position-based setters and getters
> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>
> In any case, non of the existing methods will be deprecated and only
> additional functionality will be available through the methods above.
>
> I started a voting thread on Friday. Please feel free to vote.
>
> Regards,
> Timo
>
> On 10.09.20 10:21, Danny Chan wrote:
> > Thanks for driving this Timo, +1 for voting ~
> >
> > Best,
> > Danny Chan
> > 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> >> Thanks everyone for this healthy discussion. I updated the FLIP with the
> >> outcome. I think the result is one of the last core API refactoring and
> >> users will be happy to have a consistent changelog support. Thanks for
> >> all the contributions.
> >>
> >> If there are no objections, I would continue with a voting.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 14:31, Danny Chan wrote:
> >>> Thanks, i'm fine with that.
> >>>
> >>>
> >>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >>>
> >>>> I agree with Jark. It reduces confusion.
> >>>>
> >>>> The DataStream API doesn't know changelog processing at all. A
> >>>> DataStream of Row can be used with both `fromDataStream` and
> >>>> `fromChangelogStream`. But only the latter API will interpret it as a
> >>>> changelog something.
> >>>>
> >>>> And as I mentioned before, the `toChangelogStream` must work with Row
> >>>> otherwise users are confused due to duplicate records with a missing
> >>>> changeflag.
> >>>>
> >>>> I will update the FLIP-136 a last time. I hope we can then continue
> to a
> >>>> vote.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>> I think it would bring in much confusion by a different API name just
> >>>> because the DataStream generic type is different.
> >>>>> If there are ChangelogMode that only works for Row, can we have a
> type
> >>>> check there ?
> >>>>>
> >>>>> Switch to a new API name does not really solve the problem well,
> people
> >>>> still need to declare the ChangelogMode explicitly, and there are some
> >>>> confusions:
> >>>>>
> >>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Danny Chan
> >>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> >>>>>> I had this in the inital design, but Jark had concerns at least for
> the
> >>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>
> >>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> possible.
> >>>>>>
> >>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>
> >>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
> >>>>>> cannot be represented for non-Rows and users will experience
> duplicate
> >>>>>> records with a missing changeflag.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>> “But I think the planner needs to
> >>>>>>> know whether the input is insert-only or not.”
> >>>>>>>
> >>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>
> >>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
> >>>> they like as an optional param.
> >>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
> is
> >>>> INSERT.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>>>>>
> >>>>>>>> But I think the planner needs to
> >>>>>>>> know whether the input is insert-only or not.
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi everyone,

thanks for all the feedback. I updated the FLIP again on Thursday to 
integrate the feedback I got from Jingsong and Jark offline. In 
particular I updated the `Improve dealing with Row in DataStream API` 
section another time. We introduced static methods for Row that should 
make the semantics clear to users:

// allows to use index-based setters and getters (equivalent to new 
Row(int))
// method exists for completeness
public static withPositions(int length);

// allows to use name-based setters and getters
public static withNames();

// allows to use both name-based and position-based setters and getters
public static withNamesAndPositions(Map<String, Integer> fieldNames);

In any case, non of the existing methods will be deprecated and only 
additional functionality will be available through the methods above.

I started a voting thread on Friday. Please feel free to vote.

Regards,
Timo

On 10.09.20 10:21, Danny Chan wrote:
> Thanks for driving this Timo, +1 for voting ~
> 
> Best,
> Danny Chan
> 在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>> outcome. I think the result is one of the last core API refactoring and
>> users will be happy to have a consistent changelog support. Thanks for
>> all the contributions.
>>
>> If there are no objections, I would continue with a voting.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 14:31, Danny Chan wrote:
>>> Thanks, i'm fine with that.
>>>
>>>
>>> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
>>>
>>>> I agree with Jark. It reduces confusion.
>>>>
>>>> The DataStream API doesn't know changelog processing at all. A
>>>> DataStream of Row can be used with both `fromDataStream` and
>>>> `fromChangelogStream`. But only the latter API will interpret it as a
>>>> changelog something.
>>>>
>>>> And as I mentioned before, the `toChangelogStream` must work with Row
>>>> otherwise users are confused due to duplicate records with a missing
>>>> changeflag.
>>>>
>>>> I will update the FLIP-136 a last time. I hope we can then continue to a
>>>> vote.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>> I think it would bring in much confusion by a different API name just
>>>> because the DataStream generic type is different.
>>>>> If there are ChangelogMode that only works for Row, can we have a type
>>>> check there ?
>>>>>
>>>>> Switch to a new API name does not really solve the problem well, people
>>>> still need to declare the ChangelogMode explicitly, and there are some
>>>> confusions:
>>>>>
>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>>>> I had this in the inital design, but Jark had concerns at least for the
>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>
>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>>>>>
>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>
>>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>>>> cannot be represented for non-Rows and users will experience duplicate
>>>>>> records with a missing changeflag.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>> “But I think the planner needs to
>>>>>>> know whether the input is insert-only or not.”
>>>>>>>
>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>
>>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
>>>> they like as an optional param.
>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
>>>> INSERT.
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>>>
>>>>>>>> But I think the planner needs to
>>>>>>>> know whether the input is insert-only or not.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
Thanks for driving this Timo, +1 for voting ~

Best,
Danny Chan
在 2020年9月10日 +0800 PM3:54,Timo Walther <tw...@apache.org>,写道:
> Thanks everyone for this healthy discussion. I updated the FLIP with the
> outcome. I think the result is one of the last core API refactoring and
> users will be happy to have a consistent changelog support. Thanks for
> all the contributions.
>
> If there are no objections, I would continue with a voting.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 09.09.20 14:31, Danny Chan wrote:
> > Thanks, i'm fine with that.
> >
> >
> > Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> >
> > > I agree with Jark. It reduces confusion.
> > >
> > > The DataStream API doesn't know changelog processing at all. A
> > > DataStream of Row can be used with both `fromDataStream` and
> > > `fromChangelogStream`. But only the latter API will interpret it as a
> > > changelog something.
> > >
> > > And as I mentioned before, the `toChangelogStream` must work with Row
> > > otherwise users are confused due to duplicate records with a missing
> > > changeflag.
> > >
> > > I will update the FLIP-136 a last time. I hope we can then continue to a
> > > vote.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 09.09.20 10:50, Danny Chan wrote:
> > > > I think it would bring in much confusion by a different API name just
> > > because the DataStream generic type is different.
> > > > If there are ChangelogMode that only works for Row, can we have a type
> > > check there ?
> > > >
> > > > Switch to a new API name does not really solve the problem well, people
> > > still need to declare the ChangelogMode explicitly, and there are some
> > > confusions:
> > > >
> > > > • Should DataStream of Row type always use #fromChangelogStream ?
> > > > • Does fromChangelogStream works for only INSERT ChangelogMode ?
> > > >
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> > > > > I had this in the inital design, but Jark had concerns at least for the
> > > > > `toChangelogStream(ChangelogMode)` (see earlier discussion).
> > > > >
> > > > > `fromDataStream(dataStream, schema, changelogMode)` would be possible.
> > > > >
> > > > > But in this case I would vote for a symmetric API. If we keep
> > > > > toChangelogStream we should also have a fromChangelogStream.
> > > > >
> > > > > And if we unify `toChangelogStream` and `toDataStream`, retractions
> > > > > cannot be represented for non-Rows and users will experience duplicate
> > > > > records with a missing changeflag.
> > > > >
> > > > > Regards,
> > > > > Timo
> > > > >
> > > > >
> > > > > On 09.09.20 09:31, Danny Chan wrote:
> > > > > > “But I think the planner needs to
> > > > > > know whether the input is insert-only or not.”
> > > > > >
> > > > > > Does fromDataStream(dataStream, schema, changelogMode)
> > > > > >
> > > > > > solve your concerns ? People can pass around whatever ChangelogMode
> > > they like as an optional param.
> > > > > > By default: fromDataStream(dataStream, schema), the ChangelogMode is
> > > INSERT.
> > > > > >
> > > > > > Best,
> > > > > > Danny Chan
> > > > > > 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> > > > > > >
> > > > > > > But I think the planner needs to
> > > > > > > know whether the input is insert-only or not.
> > > > > >
> > > > >
> > > >
> > >
> > >
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Thanks everyone for this healthy discussion. I updated the FLIP with the 
outcome. I think the result is one of the last core API refactoring and 
users will be happy to have a consistent changelog support. Thanks for 
all the contributions.

If there are no objections, I would continue with a voting.

What do you think?

Regards,
Timo


On 09.09.20 14:31, Danny Chan wrote:
> Thanks, i'm fine with that.
> 
> 
> Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:
> 
>> I agree with Jark. It reduces confusion.
>>
>> The DataStream API doesn't know changelog processing at all. A
>> DataStream of Row can be used with both `fromDataStream` and
>> `fromChangelogStream`. But only the latter API will interpret it as a
>> changelog something.
>>
>> And as I mentioned before, the `toChangelogStream` must work with Row
>> otherwise users are confused due to duplicate records with a missing
>> changeflag.
>>
>> I will update the FLIP-136 a last time. I hope we can then continue to a
>> vote.
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 10:50, Danny Chan wrote:
>>> I think it would bring in much confusion by a different API name just
>> because the DataStream generic type is different.
>>> If there are ChangelogMode that only works for Row, can we have a type
>> check there ?
>>>
>>> Switch to a new API name does not really solve the problem well, people
>> still need to declare the ChangelogMode explicitly, and there are some
>> confusions:
>>>
>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>>>> I had this in the inital design, but Jark had concerns at least for the
>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>
>>>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>>>
>>>> But in this case I would vote for a symmetric API. If we keep
>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>
>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>> cannot be represented for non-Rows and users will experience duplicate
>>>> records with a missing changeflag.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>> “But I think the planner needs to
>>>>> know whether the input is insert-only or not.”
>>>>>
>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>
>>>>> solve your concerns ?  People can pass around whatever ChangelogMode
>> they like as an optional param.
>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
>> INSERT.
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>>>
>>>>>> But I think the planner needs to
>>>>>> know whether the input is insert-only or not.
>>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <da...@apache.org>.
Thanks, i'm fine with that.


Timo Walther <tw...@apache.org> 于2020年9月9日周三 下午7:02写道:

> I agree with Jark. It reduces confusion.
>
> The DataStream API doesn't know changelog processing at all. A
> DataStream of Row can be used with both `fromDataStream` and
> `fromChangelogStream`. But only the latter API will interpret it as a
> changelog something.
>
> And as I mentioned before, the `toChangelogStream` must work with Row
> otherwise users are confused due to duplicate records with a missing
> changeflag.
>
> I will update the FLIP-136 a last time. I hope we can then continue to a
> vote.
>
> Regards,
> Timo
>
>
> On 09.09.20 10:50, Danny Chan wrote:
> > I think it would bring in much confusion by a different API name just
> because the DataStream generic type is different.
> > If there are ChangelogMode that only works for Row, can we have a type
> check there ?
> >
> > Switch to a new API name does not really solve the problem well, people
> still need to declare the ChangelogMode explicitly, and there are some
> confusions:
> >
> > • Should DataStream of Row type always use #fromChangelogStream ?
> > • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> >> I had this in the inital design, but Jark had concerns at least for the
> >> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>
> >> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
> >>
> >> But in this case I would vote for a symmetric API. If we keep
> >> toChangelogStream we should also have a fromChangelogStream.
> >>
> >> And if we unify `toChangelogStream` and `toDataStream`, retractions
> >> cannot be represented for non-Rows and users will experience duplicate
> >> records with a missing changeflag.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 09:31, Danny Chan wrote:
> >>> “But I think the planner needs to
> >>> know whether the input is insert-only or not.”
> >>>
> >>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>
> >>> solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> >>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> >>>>
> >>>> But I think the planner needs to
> >>>> know whether the input is insert-only or not.
> >>>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
I agree with Jark. It reduces confusion.

The DataStream API doesn't know changelog processing at all. A 
DataStream of Row can be used with both `fromDataStream` and 
`fromChangelogStream`. But only the latter API will interpret it as a 
changelog something.

And as I mentioned before, the `toChangelogStream` must work with Row 
otherwise users are confused due to duplicate records with a missing 
changeflag.

I will update the FLIP-136 a last time. I hope we can then continue to a 
vote.

Regards,
Timo


On 09.09.20 10:50, Danny Chan wrote:
> I think it would bring in much confusion by a different API name just because the DataStream generic type is different.
> If there are ChangelogMode that only works for Row, can we have a type check there ?
> 
> Switch to a new API name does not really solve the problem well, people still need to declare the ChangelogMode explicitly, and there are some confusions:
> 
> • Should DataStream of Row type always use #fromChangelogStream ?
> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> 
> 
> Best,
> Danny Chan
> 在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
>> I had this in the inital design, but Jark had concerns at least for the
>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>
>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>
>> But in this case I would vote for a symmetric API. If we keep
>> toChangelogStream we should also have a fromChangelogStream.
>>
>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>> cannot be represented for non-Rows and users will experience duplicate
>> records with a missing changeflag.
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 09:31, Danny Chan wrote:
>>> “But I think the planner needs to
>>> know whether the input is insert-only or not.”
>>>
>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>
>>> solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>>>
>>>> But I think the planner needs to
>>>> know whether the input is insert-only or not.
>>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
I think it would bring in much confusion by a different API name just because the DataStream generic type is different.
If there are ChangelogMode that only works for Row, can we have a type check there ?

Switch to a new API name does not really solve the problem well, people still need to declare the ChangelogMode explicitly, and there are some confusions:

• Should DataStream of Row type always use #fromChangelogStream ?
• Does fromChangelogStream works for only INSERT ChangelogMode ?


Best,
Danny Chan
在 2020年9月9日 +0800 PM4:21,Timo Walther <tw...@apache.org>,写道:
> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
> > >
> > > But I think the planner needs to
> > > know whether the input is insert-only or not.
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
I had this in the inital design, but Jark had concerns at least for the 
`toChangelogStream(ChangelogMode)` (see earlier discussion).

`fromDataStream(dataStream, schema, changelogMode)` would be possible.

But in this case I would vote for a symmetric API. If we keep 
toChangelogStream we should also have a fromChangelogStream.

And if we unify `toChangelogStream` and `toDataStream`, retractions 
cannot be represented for non-Rows and users will experience duplicate 
records with a missing changeflag.

Regards,
Timo


On 09.09.20 09:31, Danny Chan wrote:
> “But I think the planner needs to
> know whether the input is insert-only or not.”
> 
> Does fromDataStream(dataStream, schema, changelogMode)
> 
> solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
> By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
> 
> Best,
> Danny Chan
> 在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>>
>> But I think the planner needs to
>> know whether the input is insert-only or not.
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,dev@flink.apache.org,写道:
>
> But I think the planner needs to
> know whether the input is insert-only or not.

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
The conclusion is that we will drop `fromChangelogStream(ChangelogMode, 
DataStream<Row>)` but will keep `fromChangelogStream(DataStream<Row>)`. 
The latter is necessary to have a per-record changeflag.

We could think about merging `fromChangelogStream`/`fromDataStream` and 
`toChangelogStream`/`toDataStream`. But I think the planner needs to 
know whether the input is insert-only or not. Esp. for bounded streams 
this information will be useful in the future. Also, outputting a query 
to a non-Row type for retraction queries doesn't make much sense if a 
changeflag is missing.

What do you think?

Regard,
Timo

On 09.09.20 08:34, Danny Chan wrote:
> Thanks for the summary Timo ~
> 
> I want to clarify a little bit, what is the conclusion about the fromChangelogStream and toChangelogStream, should we use this name or we use fromDataStream with an optional ChangelogMode flag ?
> 
> Best,
> Danny Chan
> 在 2020年9月8日 +0800 PM8:22,Timo Walther <tw...@apache.org>,写道:
>> Hi Danny,
>>
>> Your proposed signatures sound good to me.
>>
>> fromDataStream(dataStream, Schema)
>> toDataStream(table, AbstractDataType<?>)
>>
>> They address all my concerns. The API would not be symmetric anymore,
>> but this is fine with me. Others raised concerns about deprecating
>> `fromDataStream(dataStream, Expression)`. Are they fine with this as well?
>>
>> If there are no objections, I would update the FLIP with the methods
>> above. Bu let me briefly summarize my thoughts on this again, so that we
>> are all on the same page:
>> - The biggest discussion point seems the fromInsertStream/toInsertStream.
>> - I don’t have a strong opinion on naming, from/toDataStream would be
>> fine for me. But:
>> - It slightly different type mappings and might break existing pipelines
>> silently. This point can be neglected as the differences are only minor.
>> - We need a way of declaring the rowtime attribute but without declaring
>> all columns again. Reduce manual schema work as much as possible.
>> - Both Dawid and I don’t like the current either “position based” or
>> “name based” expression logic that looks like a projection but is not.
>> - Actually name based expressions are not necessary, since we have
>> positions for all new data types.
>> - Schema is not suitable to influence the output type for toDataStream.
>> It should be DataType.
>>
>> All items are solved by Danny's suggestion.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 08.09.20 14:04, Danny Chan wrote:
>>> Hi, Timo ~
>>>
>>> "It is not about changelog mode compatibility, it is about the type
>>> compatibility.”
>>>
>>> For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.
>>>
>>> For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
>>> and the physical type from the passed data type.
>>>
>>> If both behavior are deterministic, what's the problem for type compatibility and safety?
>>>
>>> My concern is that in most of the cases, people use the "insert stream", they do not need to care about
>>> the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
>>> accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?
>>>
>>>
>>> "This means potentially duplicate definition of fields and their data types etc”
>>>
>>> I agree, because table already has an underlying schema there.
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月3日 +0800 PM8:12,Timo Walther <tw...@apache.org>,写道:
>>>> Hi Danny,
>>>>
>>>> "if ChangelogMode.INSERT is the default, existing pipelines should be
>>>> compatible"
>>>>
>>>> It is not about changelog mode compatibility, it is about the type
>>>> compatibility. The renaming to `toInsertStream` is only to have a mean
>>>> of dealing with data type inconsistencies that could break existing
>>>> pipelines.
>>>>
>>>> As the FLIP describes, the following new behavior should be implemented:
>>>>
>>>> - It does this by translating the TypeInformation to DataType.
>>>> - This will happen with a new TypeInfoDataTypeConverter that will no
>>>> longer produce LegacyTypeInformationType.
>>>> - All types from DataStream API should be supported by this converter.
>>>> - TupleTypeInfoBase will be translated into a proper RowType or
>>>> StructuredType.
>>>> - BigDecimals will be converted to DECIMAL(38,18) by default.
>>>> - Composite types (tuples, POJOs, rows) will be flattened by default if
>>>> they are used as top-level records (similar to the old behavior).
>>>> - The order of POJO field's is determined by the DataTypeExtractor and
>>>> must not be defined manually anymore.
>>>> - GenericTypeInfo is converted to RawType immediately by considering the
>>>> current configuration.
>>>> - A DataStream that originated from Table API will keep its DataType
>>>> information due to ExternalTypeInfo implementing DataTypeQueryable.
>>>>
>>>> I would feel safer if we do this under a new method name.
>>>>
>>>> "toDataStream(table, schema.bindTo(DataType))"
>>>>
>>>> This is what I meant with "integrate the DataType into the Schema class
>>>> itself". Yes, we can do that if everybody is fine with it. But why
>>>> should a user specify both a schema and a data type? This means
>>>> potentially duplicate definition of fields and their data types etc.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 03.09.20 11:31, Danny Chan wrote:
>>>>> "It is a more conservative approach to introduce that in a
>>>>> new method rather than changing the existing one under the hood and
>>>>> potentially break existing pipelines silently”
>>>>>
>>>>> I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
>>>>>
>>>>> “for `toDataStream` users need to be
>>>>> able to express whether they would prefer Row, POJO or atomic”
>>>>>
>>>>> I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
>>>>>
>>>>> toDataStream(table, schema.bindTo(DataType))
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
>>>>>>
>>>>>> It is a more conservative approach to introduce that in a
>>>>>> new method rather than changing the existing one under the hood and
>>>>>> potentially break existing pipelines silently
>>>>>
>>>>
>>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
Thanks for the summary Timo ~

I want to clarify a little bit, what is the conclusion about the fromChangelogStream and toChangelogStream, should we use this name or we use fromDataStream with an optional ChangelogMode flag ?

Best,
Danny Chan
在 2020年9月8日 +0800 PM8:22,Timo Walther <tw...@apache.org>,写道:
> Hi Danny,
>
> Your proposed signatures sound good to me.
>
> fromDataStream(dataStream, Schema)
> toDataStream(table, AbstractDataType<?>)
>
> They address all my concerns. The API would not be symmetric anymore,
> but this is fine with me. Others raised concerns about deprecating
> `fromDataStream(dataStream, Expression)`. Are they fine with this as well?
>
> If there are no objections, I would update the FLIP with the methods
> above. Bu let me briefly summarize my thoughts on this again, so that we
> are all on the same page:
> - The biggest discussion point seems the fromInsertStream/toInsertStream.
> - I don’t have a strong opinion on naming, from/toDataStream would be
> fine for me. But:
> - It slightly different type mappings and might break existing pipelines
> silently. This point can be neglected as the differences are only minor.
> - We need a way of declaring the rowtime attribute but without declaring
> all columns again. Reduce manual schema work as much as possible.
> - Both Dawid and I don’t like the current either “position based” or
> “name based” expression logic that looks like a projection but is not.
> - Actually name based expressions are not necessary, since we have
> positions for all new data types.
> - Schema is not suitable to influence the output type for toDataStream.
> It should be DataType.
>
> All items are solved by Danny's suggestion.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 14:04, Danny Chan wrote:
> > Hi, Timo ~
> >
> > "It is not about changelog mode compatibility, it is about the type
> > compatibility.”
> >
> > For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.
> >
> > For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
> > and the physical type from the passed data type.
> >
> > If both behavior are deterministic, what's the problem for type compatibility and safety?
> >
> > My concern is that in most of the cases, people use the "insert stream", they do not need to care about
> > the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
> > accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?
> >
> >
> > "This means potentially duplicate definition of fields and their data types etc”
> >
> > I agree, because table already has an underlying schema there.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月3日 +0800 PM8:12,Timo Walther <tw...@apache.org>,写道:
> > > Hi Danny,
> > >
> > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > compatible"
> > >
> > > It is not about changelog mode compatibility, it is about the type
> > > compatibility. The renaming to `toInsertStream` is only to have a mean
> > > of dealing with data type inconsistencies that could break existing
> > > pipelines.
> > >
> > > As the FLIP describes, the following new behavior should be implemented:
> > >
> > > - It does this by translating the TypeInformation to DataType.
> > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > longer produce LegacyTypeInformationType.
> > > - All types from DataStream API should be supported by this converter.
> > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > StructuredType.
> > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > - Composite types (tuples, POJOs, rows) will be flattened by default if
> > > they are used as top-level records (similar to the old behavior).
> > > - The order of POJO field's is determined by the DataTypeExtractor and
> > > must not be defined manually anymore.
> > > - GenericTypeInfo is converted to RawType immediately by considering the
> > > current configuration.
> > > - A DataStream that originated from Table API will keep its DataType
> > > information due to ExternalTypeInfo implementing DataTypeQueryable.
> > >
> > > I would feel safer if we do this under a new method name.
> > >
> > > "toDataStream(table, schema.bindTo(DataType))"
> > >
> > > This is what I meant with "integrate the DataType into the Schema class
> > > itself". Yes, we can do that if everybody is fine with it. But why
> > > should a user specify both a schema and a data type? This means
> > > potentially duplicate definition of fields and their data types etc.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 03.09.20 11:31, Danny Chan wrote:
> > > > "It is a more conservative approach to introduce that in a
> > > > new method rather than changing the existing one under the hood and
> > > > potentially break existing pipelines silently”
> > > >
> > > > I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
> > > >
> > > > “for `toDataStream` users need to be
> > > > able to express whether they would prefer Row, POJO or atomic”
> > > >
> > > > I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
> > > >
> > > > toDataStream(table, schema.bindTo(DataType))
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
> > > > >
> > > > > It is a more conservative approach to introduce that in a
> > > > > new method rather than changing the existing one under the hood and
> > > > > potentially break existing pipelines silently
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Danny,

Your proposed signatures sound good to me.

fromDataStream(dataStream, Schema)
toDataStream(table, AbstractDataType<?>)

They address all my concerns. The API would not be symmetric anymore, 
but this is fine with me. Others raised concerns about deprecating 
`fromDataStream(dataStream, Expression)`. Are they fine with this as well?

If there are no objections, I would update the FLIP with the methods 
above. Bu let me briefly summarize my thoughts on this again, so that we 
are all on the same page:
- The biggest discussion point seems the fromInsertStream/toInsertStream.
- I don’t have a strong opinion on naming, from/toDataStream would be 
fine for me. But:
- It slightly different type mappings and might break existing pipelines 
silently. This point can be neglected as the differences are only minor.
- We need a way of declaring the rowtime attribute but without declaring 
all columns again. Reduce manual schema work as much as possible.
- Both Dawid and I don’t like the current either “position based” or 
“name based” expression logic that looks like a projection but is not.
- Actually name based expressions are not necessary, since we have 
positions for all new data types.
- Schema is not suitable to influence the output type for toDataStream. 
It should be DataType.

All items are solved by Danny's suggestion.

Regards,
Timo



On 08.09.20 14:04, Danny Chan wrote:
> Hi, Timo ~
> 
> "It is not about changelog mode compatibility, it is about the type
> compatibility.”
> 
> For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.
> 
> For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
> and the physical type from the passed data type.
> 
> If both behavior are deterministic, what's the problem for type compatibility and safety?
> 
> My concern is that in most of the cases, people use the "insert stream", they do not need to care about
> the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
> accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?
> 
> 
> "This means potentially duplicate definition of fields and their data types etc”
> 
> I agree, because table already has an underlying schema there.
> 
> Best,
> Danny Chan
> 在 2020年9月3日 +0800 PM8:12,Timo Walther <tw...@apache.org>,写道:
>> Hi Danny,
>>
>> "if ChangelogMode.INSERT is the default, existing pipelines should be
>> compatible"
>>
>> It is not about changelog mode compatibility, it is about the type
>> compatibility. The renaming to `toInsertStream` is only to have a mean
>> of dealing with data type inconsistencies that could break existing
>> pipelines.
>>
>> As the FLIP describes, the following new behavior should be implemented:
>>
>> - It does this by translating the TypeInformation to DataType.
>> - This will happen with a new TypeInfoDataTypeConverter that will no
>> longer produce LegacyTypeInformationType.
>> - All types from DataStream API should be supported by this converter.
>> - TupleTypeInfoBase will be translated into a proper RowType or
>> StructuredType.
>> - BigDecimals will be converted to DECIMAL(38,18) by default.
>> - Composite types (tuples, POJOs, rows) will be flattened by default if
>> they are used as top-level records (similar to the old behavior).
>> - The order of POJO field's is determined by the DataTypeExtractor and
>> must not be defined manually anymore.
>> - GenericTypeInfo is converted to RawType immediately by considering the
>> current configuration.
>> - A DataStream that originated from Table API will keep its DataType
>> information due to ExternalTypeInfo implementing DataTypeQueryable.
>>
>> I would feel safer if we do this under a new method name.
>>
>> "toDataStream(table, schema.bindTo(DataType))"
>>
>> This is what I meant with "integrate the DataType into the Schema class
>> itself". Yes, we can do that if everybody is fine with it. But why
>> should a user specify both a schema and a data type? This means
>> potentially duplicate definition of fields and their data types etc.
>>
>> Regards,
>> Timo
>>
>>
>> On 03.09.20 11:31, Danny Chan wrote:
>>> "It is a more conservative approach to introduce that in a
>>> new method rather than changing the existing one under the hood and
>>> potentially break existing pipelines silently”
>>>
>>> I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
>>>
>>> “for `toDataStream` users need to be
>>> able to express whether they would prefer Row, POJO or atomic”
>>>
>>> I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
>>>
>>> toDataStream(table, schema.bindTo(DataType))
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
>>>>
>>>> It is a more conservative approach to introduce that in a
>>>> new method rather than changing the existing one under the hood and
>>>> potentially break existing pipelines silently
>>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
Hi, Timo ~

"It is not about changelog mode compatibility, it is about the type
compatibility.”

For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.

For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
and the physical type from the passed data type.

If both behavior are deterministic, what's the problem for type compatibility and safety?

My concern is that in most of the cases, people use the "insert stream", they do not need to care about
the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?


"This means potentially duplicate definition of fields and their data types etc”

I agree, because table already has an underlying schema there.

Best,
Danny Chan
在 2020年9月3日 +0800 PM8:12,Timo Walther <tw...@apache.org>,写道:
> Hi Danny,
>
> "if ChangelogMode.INSERT is the default, existing pipelines should be
> compatible"
>
> It is not about changelog mode compatibility, it is about the type
> compatibility. The renaming to `toInsertStream` is only to have a mean
> of dealing with data type inconsistencies that could break existing
> pipelines.
>
> As the FLIP describes, the following new behavior should be implemented:
>
> - It does this by translating the TypeInformation to DataType.
> - This will happen with a new TypeInfoDataTypeConverter that will no
> longer produce LegacyTypeInformationType.
> - All types from DataStream API should be supported by this converter.
> - TupleTypeInfoBase will be translated into a proper RowType or
> StructuredType.
> - BigDecimals will be converted to DECIMAL(38,18) by default.
> - Composite types (tuples, POJOs, rows) will be flattened by default if
> they are used as top-level records (similar to the old behavior).
> - The order of POJO field's is determined by the DataTypeExtractor and
> must not be defined manually anymore.
> - GenericTypeInfo is converted to RawType immediately by considering the
> current configuration.
> - A DataStream that originated from Table API will keep its DataType
> information due to ExternalTypeInfo implementing DataTypeQueryable.
>
> I would feel safer if we do this under a new method name.
>
> "toDataStream(table, schema.bindTo(DataType))"
>
> This is what I meant with "integrate the DataType into the Schema class
> itself". Yes, we can do that if everybody is fine with it. But why
> should a user specify both a schema and a data type? This means
> potentially duplicate definition of fields and their data types etc.
>
> Regards,
> Timo
>
>
> On 03.09.20 11:31, Danny Chan wrote:
> > "It is a more conservative approach to introduce that in a
> > new method rather than changing the existing one under the hood and
> > potentially break existing pipelines silently”
> >
> > I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
> >
> > “for `toDataStream` users need to be
> > able to express whether they would prefer Row, POJO or atomic”
> >
> > I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
> >
> > toDataStream(table, schema.bindTo(DataType))
> >
> > Best,
> > Danny Chan
> > 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
> > >
> > > It is a more conservative approach to introduce that in a
> > > new method rather than changing the existing one under the hood and
> > > potentially break existing pipelines silently
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Danny,

"if ChangelogMode.INSERT is the default, existing pipelines should be 
compatible"

It is not about changelog mode compatibility, it is about the type 
compatibility. The renaming to `toInsertStream` is only to have a mean 
of dealing with data type inconsistencies that could break existing 
pipelines.

As the FLIP describes, the following new behavior should be implemented:

- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no 
longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or 
StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if 
they are used as top-level records (similar to the old behavior).
- The order of POJO field's is determined by the DataTypeExtractor and 
must not be defined manually anymore.
- GenericTypeInfo is converted to RawType immediately by considering the 
current configuration.
- A DataStream that originated from Table API will keep its DataType 
information due to ExternalTypeInfo implementing DataTypeQueryable.

I would feel safer if we do this under a new method name.

"toDataStream(table, schema.bindTo(DataType))"

This is what I meant with "integrate the DataType into the Schema class 
itself". Yes, we can do that if everybody is fine with it. But why 
should a user specify both a schema and a data type? This means 
potentially duplicate definition of fields and their data types etc.

Regards,
Timo


On 03.09.20 11:31, Danny Chan wrote:
> "It is a more conservative approach to introduce that in a
> new method rather than changing the existing one under the hood and
> potentially break existing pipelines silently”
> 
> I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
> 
> “for `toDataStream` users need to be
> able to express whether they would prefer Row, POJO or atomic”
> 
> I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
> 
> toDataStream(table, schema.bindTo(DataType))
> 
> Best,
> Danny Chan
> 在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
>>
>> It is a more conservative approach to introduce that in a
>> new method rather than changing the existing one under the hood and
>> potentially break existing pipelines silently
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
"It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently”

I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.

“for `toDataStream` users need to be
able to express whether they would prefer Row, POJO or atomic”

I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this

toDataStream(table, schema.bindTo(DataType))

Best,
Danny Chan
在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
>
> It is a more conservative approach to introduce that in a
> new method rather than changing the existing one under the hood and
> potentially break existing pipelines silently

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Thanks for the nice summary Dawid. I also see the pain points in this 
part of the API. Most of the users just want to add a time attribute. 
I'm not sure how much projection features we need to have in a 
`fromDataStream`. Users can do column renaming/reordering afterwards in 
a `.select()`.

The only functionality missing is a `system_rowtime()` or 
`system_proctime`. I don't know if we need to give users the chance to 
also select the location of the attribute or always append at the end.

If we go with the new TypeInfo -> DataType mapping as mentioned in the 
FLIP, users don't need to specify all fields for giving POJOs a 
deterministic field order. Most of that would happen automatically. I 
think we can definetely get rid of the column renaming functionality.

@Danny: The renaming of the API is mostly because of the slightly 
different semantics. In particular a different type mapping, different 
field order. It is a more conservative approach to introduce that in a 
new method rather than changing the existing one under the hood and 
potentially break existing pipelines silently.

"Do you mean the physical type of the record ?"

Yes, if we accapt a DataStream<T> than T can be just atomic (like 
String), TupleX, Row, POJO. We can derive a lot of information from 
DataStream.getType() but should still give users the chance to influence 
it for `fromDataStream`. However, for `toDataStream` users need to be 
able to express whether they would prefer Row, POJO or atomic. And this 
cannot be expressed with Schema, but only with DataType.

Regards,
Timo


On 02.09.20 12:23, Danny Chan wrote:
> Thanks Timo ~
> 
> “No this is not possible, because T records have no changeflag. Without a
> changeflag, a ChangelogMode makes not much sense. “
> 
> I agree, but just distinguish the different ChangelogMode with a renamed API still does not resolve the problem either,
> an API change compared to an additional parameter, i would choose the later.
> 
> “However, with
> a schema you cannot specify the data type of the top-level record
> itself”
> 
> What is a “top-level record “ ? Do you mean the physical type of the record ? From a Schema we can infer its original type though.
> 
> “Is it possible to keep using `fromDataStream(DataStream, Expression…)`”
> 
>  From the SQL side, an Expression list usually means a computation (projection) there, while here we actually want to define the schema of the stream(which is static). Compared to "fromInsertStream(DataStream).select()”, they actually indicate the same thing from the API level, although I would not vote `fromDataStream(DataStream, Expression…)` it is still better than `fromInsertStream(DataStream).select()`.
> 
> Best,
> Danny Chan
> 在 2020年9月2日 +0800 PM4:19,Timo Walther <tw...@apache.org>,写道:
>> Hi everyone
>>
>> thanks for your feedback. It's a lot of content that needs to be
>> digested. I will update the FLIP shortly to incorporate some of the
>> feedback already. But let me respond to some topics first:
>>
>> "not deprecate these API", "the API of the table layer is changing too fast"
>>
>> I agree that deprecating API is definitely not great for users, but in
>> this cases I think it is for the greater good it makes the API more
>> understandable and focuses on common use cases for the future. I would
>> rather say that the API is about to settle because there only a couple
>> of shortcomings left and the bigger picture is clearer than ever. IMO
>> The proposed changes are one of the last bigger API changes on the
>> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
>> methods just because we changed so much in the last releases should not
>> be a reason to keep confusing API. Users are happy to upgrade if they
>> also get more features by upgrading (e.g. fromChangelogStream).
>>
>> 1. "fromDataStream VS fromInsertStream"
>>
>> The main reason to change this API is to have the possibility to update
>> the type mapping without breaking backwards compatibility. The name
>> `fromInsertStream` makes it possible to have new semantics and makes
>> concepts more explicit by naming.
>>
>> 2. "toAppendStream VS toInsertStream"
>>
>> "Append" is common in the Flink community but the outside world uses
>> "insert". Actually, the term "append-only table" is wrong because SQL
>> tables have bag semantics without any order. So "appending" is more of
>> an "insertion". This is also represented in FLIP-95's `RowKind` where we
>> call the concepts INSERT and `ChangelogKind.insertOnly`.
>>
>> 3. "`.rowtime()` and `.proctime()`"
>>
>> "API is also widely used, even in our test code"
>>
>> The test code is already quite outdated and uses a lot of deprecated
>> API. We need to deal with that with a better testing infrastructure. But
>> this can be future work.
>>
>> "users have already accepted it"
>>
>> I'm not sure if users have already accepted it. I think we get at least
>> one question around this topic every week because users would like to
>> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
>> And specifying all fields just to give the StreamRecord timestamp a name
>> should be made easier. This is necessary in 80% of all use cases.
>>
>> 4. "toAppendStream(Table table, Class<T>/TypeInformation)"
>>
>> The DataType system is way easier than the TypeInformation system
>> because it provides a consistent look and feel with a lot of utilities.
>> E.g. many users didn't know that they can just pass `Row.class` in the
>> past. Actually extracting types from a `Row.class` is not supported by
>> the TypeExtractor (we recently even printed a warning to the logs) but
>> we hacked some logic into the method. With AbstractDataType, users can
>> still use classes via `DataTypes.of`; for example
>> `toInsertStream(DataTypes.of(MyPojo.class))`.
>>
>> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>>
>> Similar to `TableEnvironment.execute()` we did some mistakes during the
>> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
>> we introduced it too quickly. In general this method is correct, but now
>> we cannot change the underlying semantics again without breaking
>> existing pipelines. We could keep this method and just change the type
>> system under the hood, in most of the cases the pipeline should still
>> work but we cannot guarantee this due to slight differences.
>>
>> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>,
>> ChangelogMode)"
>>
>> No this is not possible, because T records have no changeflag. Without a
>> changeflag, a ChangelogMode makes not much sense. That's why
>> `from/toChangelogStream` supports only `Row` whereas the
>> `from/toInsertStream` accepts arbitrary type classes.
>>
>> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"
>>
>> I also thought about this method and using `Schema` there. However, with
>> a schema you cannot specify the data type of the top-level record
>> itself. We would need to offer fromDataStream(dataStream, Schema,
>> DataType) or integrate the DataType into the Schema class itself which
>> would mix up the concepts.
>>
>> 8. "name-based setters should always be based on fieldNames"
>>
>> I'm fine with throwing an exception. If my mentioned semantics, are too
>> confusing.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 02.09.20 07:25, Jingsong Li wrote:
>>>> a Row has two modes represented by an internal boolean flag
>>> `hasFieldOrder`
>>>
>>> +1 confusion with Dawid that what's the result when index-based setters and
>>> name-based setters are mixed used.
>>> And name-based setters look like append instead of set.
>>>
>>> It reminds me of Avro's `GenericRecord`, We should support real random
>>> name-based setters instead of append.
>>>
>>> So, what I think is, name-based setters should always be based
>>> on fieldNames just like name-based getters. Otherwise, throw an exception.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:
>>>
>>>> Timo, Thanks for the discussion
>>>>
>>>> I have only read the "Conversion of DataStream to Table" part so i would
>>>> only put some objections there ~
>>>>
>>>>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
>>>>
>>>> At first glance, from the perspective of a user, i'm confused by why we
>>>> must dintinguish on the API level what a data stream is, e.g. an insert
>>>> stream or whatever other kind of stream.
>>>>
>>>> As a user, he does not expect to must distinguish between several
>>>> datastream options. The framework should have the ability to infer the
>>>> ChangelogMode of the stream, but sadly we can not at the moment, becase we
>>>> do not have a metadata to describe the ChangelogMode what actually the
>>>> framework need.
>>>>
>>>> And could it be:
>>>>
>>>> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) where
>>>> the ChanglogMode is optional because 90% of the datastream are insert for
>>>> now.
>>>>
>>>> or:
>>>>
>>>> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
>>>> self-describing what kind of stream it is (again, if not specified, the
>>>> default is INSERT).
>>>>
>>>>> tEnv
>>>>> .fromInsertStream(DataStream<T>)
>>>>> .select('*, system_rowtime().as("rowtime"),
>>>> system_proctime().as(“proctime”))
>>>>
>>>> In order to declare the time-attributes on datastream, i must say I prefer
>>>>
>>>> tEnv.fromDataStream(dataStream, Schema) for these reasons:
>>>>
>>>> - Schema is the uniform interface to declare the metadata for a table in
>>>> the Table/SQL API, with an imperative coding style, in Descriptor API we
>>>> also use it for the time-attributes purpose
>>>> - Use a projection for time-attributes is not a good idea, because from
>>>> the SQL side, we declare it as a metadata of part of the table schema when
>>>> we define the DDL. Although we may explain the DDL internally using
>>>> computed column, that does not mean we must do that in the DataStream API
>>>> explicitly. In the SQL world, no projection function outputs type of
>>>> time-attribute, we better still put the time-attributes in the scope of the
>>>> table metadata.
>>>>
>>>> Best,
>>>> Danny Chan
>>>> 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been touched
>>>>> during the introduction of these new features and are kind of outdated.
>>>>> The interfaces lack important functionality that is available in Table
>>>>> API but not exposed to DataStream API users. DataStream API is still our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>
>>>
>>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
Thanks Timo ~

“No this is not possible, because T records have no changeflag. Without a
changeflag, a ChangelogMode makes not much sense. “

I agree, but just distinguish the different ChangelogMode with a renamed API still does not resolve the problem either,
an API change compared to an additional parameter, i would choose the later.

“However, with
a schema you cannot specify the data type of the top-level record
itself”

What is a “top-level record “ ? Do you mean the physical type of the record ? From a Schema we can infer its original type though.

“Is it possible to keep using `fromDataStream(DataStream, Expression…)`”

From the SQL side, an Expression list usually means a computation (projection) there, while here we actually want to define the schema of the stream(which is static). Compared to "fromInsertStream(DataStream).select()”, they actually indicate the same thing from the API level, although I would not vote `fromDataStream(DataStream, Expression…)` it is still better than `fromInsertStream(DataStream).select()`.

Best,
Danny Chan
在 2020年9月2日 +0800 PM4:19,Timo Walther <tw...@apache.org>,写道:
> Hi everyone
>
> thanks for your feedback. It's a lot of content that needs to be
> digested. I will update the FLIP shortly to incorporate some of the
> feedback already. But let me respond to some topics first:
>
> "not deprecate these API", "the API of the table layer is changing too fast"
>
> I agree that deprecating API is definitely not great for users, but in
> this cases I think it is for the greater good it makes the API more
> understandable and focuses on common use cases for the future. I would
> rather say that the API is about to settle because there only a couple
> of shortcomings left and the bigger picture is clearer than ever. IMO
> The proposed changes are one of the last bigger API changes on the
> roadmap. I cannot see other bigger refactorings in the mid-term. Keeping
> methods just because we changed so much in the last releases should not
> be a reason to keep confusing API. Users are happy to upgrade if they
> also get more features by upgrading (e.g. fromChangelogStream).
>
> 1. "fromDataStream VS fromInsertStream"
>
> The main reason to change this API is to have the possibility to update
> the type mapping without breaking backwards compatibility. The name
> `fromInsertStream` makes it possible to have new semantics and makes
> concepts more explicit by naming.
>
> 2. "toAppendStream VS toInsertStream"
>
> "Append" is common in the Flink community but the outside world uses
> "insert". Actually, the term "append-only table" is wrong because SQL
> tables have bag semantics without any order. So "appending" is more of
> an "insertion". This is also represented in FLIP-95's `RowKind` where we
> call the concepts INSERT and `ChangelogKind.insertOnly`.
>
> 3. "`.rowtime()` and `.proctime()`"
>
> "API is also widely used, even in our test code"
>
> The test code is already quite outdated and uses a lot of deprecated
> API. We need to deal with that with a better testing infrastructure. But
> this can be future work.
>
> "users have already accepted it"
>
> I'm not sure if users have already accepted it. I think we get at least
> one question around this topic every week because users would like to
> call `.rowtime` on arbitrary timestamps in the middle of the pipeline.
> And specifying all fields just to give the StreamRecord timestamp a name
> should be made easier. This is necessary in 80% of all use cases.
>
> 4. "toAppendStream(Table table, Class<T>/TypeInformation)"
>
> The DataType system is way easier than the TypeInformation system
> because it provides a consistent look and feel with a lot of utilities.
> E.g. many users didn't know that they can just pass `Row.class` in the
> past. Actually extracting types from a `Row.class` is not supported by
> the TypeExtractor (we recently even printed a warning to the logs) but
> we hacked some logic into the method. With AbstractDataType, users can
> still use classes via `DataTypes.of`; for example
> `toInsertStream(DataTypes.of(MyPojo.class))`.
>
> 5. "tEnv#createTemporaryView was introduced in release-1.10"
>
> Similar to `TableEnvironment.execute()` we did some mistakes during the
> big refactorings. IMHO tEnv#createTemporaryView was one mistake because
> we introduced it too quickly. In general this method is correct, but now
> we cannot change the underlying semantics again without breaking
> existing pipelines. We could keep this method and just change the type
> system under the hood, in most of the cases the pipeline should still
> work but we cannot guarantee this due to slight differences.
>
> 6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>,
> ChangelogMode)"
>
> No this is not possible, because T records have no changeflag. Without a
> changeflag, a ChangelogMode makes not much sense. That's why
> `from/toChangelogStream` supports only `Row` whereas the
> `from/toInsertStream` accepts arbitrary type classes.
>
> 7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"
>
> I also thought about this method and using `Schema` there. However, with
> a schema you cannot specify the data type of the top-level record
> itself. We would need to offer fromDataStream(dataStream, Schema,
> DataType) or integrate the DataType into the Schema class itself which
> would mix up the concepts.
>
> 8. "name-based setters should always be based on fieldNames"
>
> I'm fine with throwing an exception. If my mentioned semantics, are too
> confusing.
>
> Regards,
> Timo
>
>
>
> On 02.09.20 07:25, Jingsong Li wrote:
> > > a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`
> >
> > +1 confusion with Dawid that what's the result when index-based setters and
> > name-based setters are mixed used.
> > And name-based setters look like append instead of set.
> >
> > It reminds me of Avro's `GenericRecord`, We should support real random
> > name-based setters instead of append.
> >
> > So, what I think is, name-based setters should always be based
> > on fieldNames just like name-based getters. Otherwise, throw an exception.
> >
> > Best,
> > Jingsong
> >
> > On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:
> >
> > > Timo, Thanks for the discussion
> > >
> > > I have only read the "Conversion of DataStream to Table" part so i would
> > > only put some objections there ~
> > >
> > > > StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
> > >
> > > At first glance, from the perspective of a user, i'm confused by why we
> > > must dintinguish on the API level what a data stream is, e.g. an insert
> > > stream or whatever other kind of stream.
> > >
> > > As a user, he does not expect to must distinguish between several
> > > datastream options. The framework should have the ability to infer the
> > > ChangelogMode of the stream, but sadly we can not at the moment, becase we
> > > do not have a metadata to describe the ChangelogMode what actually the
> > > framework need.
> > >
> > > And could it be:
> > >
> > > StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) where
> > > the ChanglogMode is optional because 90% of the datastream are insert for
> > > now.
> > >
> > > or:
> > >
> > > DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
> > > self-describing what kind of stream it is (again, if not specified, the
> > > default is INSERT).
> > >
> > > > tEnv
> > > > .fromInsertStream(DataStream<T>)
> > > > .select('*, system_rowtime().as("rowtime"),
> > > system_proctime().as(“proctime”))
> > >
> > > In order to declare the time-attributes on datastream, i must say I prefer
> > >
> > > tEnv.fromDataStream(dataStream, Schema) for these reasons:
> > >
> > > - Schema is the uniform interface to declare the metadata for a table in
> > > the Table/SQL API, with an imperative coding style, in Descriptor API we
> > > also use it for the time-attributes purpose
> > > - Use a projection for time-attributes is not a good idea, because from
> > > the SQL side, we declare it as a metadata of part of the table schema when
> > > we define the DDL. Although we may explain the DDL internally using
> > > computed column, that does not mean we must do that in the DataStream API
> > > explicitly. In the SQL world, no projection function outputs type of
> > > time-attribute, we better still put the time-attributes in the scope of the
> > > table metadata.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
> > > > Hi everyone,
> > > >
> > > > I would like to propose a FLIP that aims to resolve the remaining
> > > > shortcomings in the Table API:
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > > >
> > > > The Table API has received many new features over the last year. It
> > > > supports a new type system (FLIP-37), connectors support changelogs
> > > > (FLIP-95), we have well defined internal data structures (FLIP-95),
> > > > support for result retrieval in an interactive fashion (FLIP-84), and
> > > > soon new TableDescriptors (FLIP-129).
> > > >
> > > > However, the interfaces from and to DataStream API have not been touched
> > > > during the introduction of these new features and are kind of outdated.
> > > > The interfaces lack important functionality that is available in Table
> > > > API but not exposed to DataStream API users. DataStream API is still our
> > > > most important API which is why a good interoperability is crucial.
> > > >
> > > > This FLIP is a mixture of different topics that improve the
> > > > interoperability between DataStream and Table API in terms of:
> > > >
> > > > - DataStream <-> Table conversion
> > > > - translation of type systems TypeInformation <-> DataType
> > > > - schema definition (incl. rowtime, watermarks, primary key)
> > > > - changelog handling
> > > > - row handling in DataStream API
> > > >
> > > > I'm looking forward to your feedback.
> > > >
> > > > Regards,
> > > > Timo
> > >
> >
> >
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi everyone

thanks for your feedback. It's a lot of content that needs to be 
digested. I will update the FLIP shortly to incorporate some of the 
feedback already. But let me respond to some topics first:

"not deprecate these API", "the API of the table layer is changing too fast"

I agree that deprecating API is definitely not great for users, but in 
this cases I think it is for the greater good it makes the API more 
understandable and focuses on common use cases for the future. I would 
rather say that the API is about to settle because there only a couple 
of shortcomings left and the bigger picture is clearer than ever. IMO 
The proposed changes are one of the last bigger API changes on the 
roadmap. I cannot see other bigger refactorings in the mid-term. Keeping 
methods just because we changed so much in the last releases should not 
be a reason to keep confusing API. Users are happy to upgrade if they 
also get more features by upgrading (e.g. fromChangelogStream).

1. "fromDataStream VS fromInsertStream"

The main reason to change this API is to have the possibility to update 
the type mapping without breaking backwards compatibility. The name 
`fromInsertStream` makes it possible to have new semantics and makes 
concepts more explicit by naming.

2. "toAppendStream VS toInsertStream"

"Append" is common in the Flink community but the outside world uses 
"insert". Actually, the term "append-only table" is wrong because SQL 
tables have bag semantics without any order. So "appending" is more of 
an "insertion". This is also represented in FLIP-95's `RowKind` where we 
call the concepts INSERT and `ChangelogKind.insertOnly`.

3. "`.rowtime()` and `.proctime()`"

"API is also widely used, even in our test code"

The test code is already quite outdated and uses a lot of deprecated 
API. We need to deal with that with a better testing infrastructure. But 
this can be future work.

"users have already accepted it"

I'm not sure if users have already accepted it. I think we get at least 
one question around this topic every week because users would like to 
call `.rowtime` on arbitrary timestamps in the middle of the pipeline. 
And specifying all fields just to give the StreamRecord timestamp a name 
should be made easier. This is necessary in 80% of all use cases.

4. "toAppendStream(Table table, Class<T>/TypeInformation)"

The DataType system is way easier than the TypeInformation system 
because it provides a consistent look and feel with a lot of utilities. 
E.g. many users didn't know that they can just pass `Row.class` in the 
past. Actually extracting types from a `Row.class` is not supported by 
the TypeExtractor (we recently even printed a warning to the logs) but 
we hacked some logic into the method. With AbstractDataType, users can 
still use classes via `DataTypes.of`; for example 
`toInsertStream(DataTypes.of(MyPojo.class))`.

5. "tEnv#createTemporaryView was introduced in release-1.10"

Similar to `TableEnvironment.execute()` we did some mistakes during the 
big refactorings. IMHO tEnv#createTemporaryView was one mistake because 
we introduced it too quickly. In general this method is correct, but now 
we cannot change the underlying semantics again without breaking 
existing pipelines. We could keep this method and just change the type 
system under the hood, in most of the cases the pipeline should still 
work but we cannot guarantee this due to slight differences.

6. "could it be "StreamTableEnvironment.fromDataStream(DataStream<T>, 
ChangelogMode)"

No this is not possible, because T records have no changeflag. Without a 
changeflag, a ChangelogMode makes not much sense. That's why 
`from/toChangelogStream` supports only `Row` whereas the 
`from/toInsertStream` accepts arbitrary type classes.

7. "i must say I prefer tEnv.fromDataStream(dataStream, Schema)"

I also thought about this method and using `Schema` there. However, with 
a schema you cannot specify the data type of the top-level record 
itself. We would need to offer fromDataStream(dataStream, Schema, 
DataType) or integrate the DataType into the Schema class itself which 
would mix up the concepts.

8. "name-based setters should always be based on fieldNames"

I'm fine with throwing an exception. If my mentioned semantics, are too 
confusing.

Regards,
Timo



On 02.09.20 07:25, Jingsong Li wrote:
>> a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`
> 
> +1 confusion with Dawid that what's the result when index-based setters and
> name-based setters are mixed used.
> And name-based setters look like append instead of set.
> 
> It reminds me of Avro's `GenericRecord`, We should support real random
> name-based setters instead of append.
> 
> So, what I think is, name-based setters should always be based
> on fieldNames just like name-based getters. Otherwise, throw an exception.
> 
> Best,
> Jingsong
> 
> On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:
> 
>> Timo, Thanks for the discussion
>>
>> I have only read the "Conversion of DataStream to Table" part so i would
>> only put some objections there ~
>>
>>> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
>>
>> At first glance, from the perspective of a user, i'm confused by why we
>> must dintinguish on the API level what a data stream is, e.g. an insert
>> stream or whatever other kind of stream.
>>
>> As a user, he does not expect to must distinguish between several
>> datastream options. The framework should have the ability to infer the
>> ChangelogMode of the stream, but sadly we can not at the moment, becase we
>> do not have a metadata to describe the ChangelogMode what actually the
>> framework need.
>>
>> And could it be:
>>
>> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) where
>> the ChanglogMode is optional because 90% of the datastream are insert for
>> now.
>>
>> or:
>>
>> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
>> self-describing what kind of stream it is (again, if not specified, the
>> default is INSERT).
>>
>>> tEnv
>>> .fromInsertStream(DataStream<T>)
>>> .select('*, system_rowtime().as("rowtime"),
>> system_proctime().as(“proctime”))
>>
>> In order to declare the time-attributes on datastream, i must say I prefer
>>
>> tEnv.fromDataStream(dataStream, Schema) for these reasons:
>>
>> - Schema is the uniform interface to declare the metadata for a table in
>> the Table/SQL API, with an imperative coding style, in Descriptor API we
>> also use it for the time-attributes purpose
>> - Use a projection for time-attributes is not a good idea, because from
>> the SQL side, we declare it as a metadata of part of the table schema when
>> we define the DDL. Although we may explain the DDL internally using
>> computed column, that does not mean we must do that in the DataStream API
>> explicitly. In the SQL world, no projection function outputs type of
>> time-attribute, we better still put the time-attributes in the scope of the
>> table metadata.
>>
>> Best,
>> Danny Chan
>> 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
>>> Hi everyone,
>>>
>>> I would like to propose a FLIP that aims to resolve the remaining
>>> shortcomings in the Table API:
>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>
>>> The Table API has received many new features over the last year. It
>>> supports a new type system (FLIP-37), connectors support changelogs
>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>> soon new TableDescriptors (FLIP-129).
>>>
>>> However, the interfaces from and to DataStream API have not been touched
>>> during the introduction of these new features and are kind of outdated.
>>> The interfaces lack important functionality that is available in Table
>>> API but not exposed to DataStream API users. DataStream API is still our
>>> most important API which is why a good interoperability is crucial.
>>>
>>> This FLIP is a mixture of different topics that improve the
>>> interoperability between DataStream and Table API in terms of:
>>>
>>> - DataStream <-> Table conversion
>>> - translation of type systems TypeInformation <-> DataType
>>> - schema definition (incl. rowtime, watermarks, primary key)
>>> - changelog handling
>>> - row handling in DataStream API
>>>
>>> I'm looking forward to your feedback.
>>>
>>> Regards,
>>> Timo
>>
> 
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jingsong Li <ji...@gmail.com>.
> a Row has two modes represented by an internal boolean flag
`hasFieldOrder`

+1 confusion with Dawid that what's the result when index-based setters and
name-based setters are mixed used.
And name-based setters look like append instead of set.

It reminds me of Avro's `GenericRecord`, We should support real random
name-based setters instead of append.

So, what I think is, name-based setters should always be based
on fieldNames just like name-based getters. Otherwise, throw an exception.

Best,
Jingsong

On Wed, Sep 2, 2020 at 12:43 PM Danny Chan <yu...@gmail.com> wrote:

> Timo, Thanks for the discussion
>
> I have only read the "Conversion of DataStream to Table" part so i would
> only put some objections there ~
>
> > StreamTableEnvironment.fromInsertStream(DataStream<T>): Table
>
> At first glance, from the perspective of a user, i'm confused by why we
> must dintinguish on the API level what a data stream is, e.g. an insert
> stream or whatever other kind of stream.
>
> As a user, he does not expect to must distinguish between several
> datastream options. The framework should have the ability to infer the
> ChangelogMode of the stream, but sadly we can not at the moment, becase we
> do not have a metadata to describe the ChangelogMode what actually the
> framework need.
>
> And could it be:
>
> StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) where
> the ChanglogMode is optional because 90% of the datastream are insert for
> now.
>
> or:
>
> DataStream.withChangelogMode(ChangelogMode) so that DataStream can be
> self-describing what kind of stream it is (again, if not specified, the
> default is INSERT).
>
> > tEnv
> >.fromInsertStream(DataStream<T>)
> >.select('*, system_rowtime().as("rowtime"),
> system_proctime().as(“proctime”))
>
> In order to declare the time-attributes on datastream, i must say I prefer
>
> tEnv.fromDataStream(dataStream, Schema) for these reasons:
>
> - Schema is the uniform interface to declare the metadata for a table in
> the Table/SQL API, with an imperative coding style, in Descriptor API we
> also use it for the time-attributes purpose
> - Use a projection for time-attributes is not a good idea, because from
> the SQL side, we declare it as a metadata of part of the table schema when
> we define the DDL. Although we may explain the DDL internally using
> computed column, that does not mean we must do that in the DataStream API
> explicitly. In the SQL world, no projection function outputs type of
> time-attribute, we better still put the time-attributes in the scope of the
> table metadata.
>
> Best,
> Danny Chan
> 在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
> > Hi everyone,
> >
> > I would like to propose a FLIP that aims to resolve the remaining
> > shortcomings in the Table API:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >
> > The Table API has received many new features over the last year. It
> > supports a new type system (FLIP-37), connectors support changelogs
> > (FLIP-95), we have well defined internal data structures (FLIP-95),
> > support for result retrieval in an interactive fashion (FLIP-84), and
> > soon new TableDescriptors (FLIP-129).
> >
> > However, the interfaces from and to DataStream API have not been touched
> > during the introduction of these new features and are kind of outdated.
> > The interfaces lack important functionality that is available in Table
> > API but not exposed to DataStream API users. DataStream API is still our
> > most important API which is why a good interoperability is crucial.
> >
> > This FLIP is a mixture of different topics that improve the
> > interoperability between DataStream and Table API in terms of:
> >
> > - DataStream <-> Table conversion
> > - translation of type systems TypeInformation <-> DataType
> > - schema definition (incl. rowtime, watermarks, primary key)
> > - changelog handling
> > - row handling in DataStream API
> >
> > I'm looking forward to your feedback.
> >
> > Regards,
> > Timo
>


-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Danny Chan <yu...@gmail.com>.
Timo, Thanks for the discussion

I have only read the "Conversion of DataStream to Table" part so i would only put some objections there ~

> StreamTableEnvironment.fromInsertStream(DataStream<T>): Table

At first glance, from the perspective of a user, i'm confused by why we must dintinguish on the API level what a data stream is, e.g. an insert stream or whatever other kind of stream.

As a user, he does not expect to must distinguish between several datastream options. The framework should have the ability to infer the ChangelogMode of the stream, but sadly we can not at the moment, becase we do not have a metadata to describe the ChangelogMode what actually the framework need.

And could it be:

StreamTableEnvironment.fromDataStream(DataStream<T>, ChangelogMode) where the ChanglogMode is optional because 90% of the datastream are insert for now.

or:

DataStream.withChangelogMode(ChangelogMode) so that DataStream can be self-describing what kind of stream it is (again, if not specified, the default is INSERT).

> tEnv
>.fromInsertStream(DataStream<T>)
>.select('*, system_rowtime().as("rowtime"), system_proctime().as(“proctime”))

In order to declare the time-attributes on datastream, i must say I prefer

tEnv.fromDataStream(dataStream, Schema) for these reasons:

- Schema is the uniform interface to declare the metadata for a table in the Table/SQL API, with an imperative coding style, in Descriptor API we also use it for the time-attributes purpose
- Use a projection for time-attributes is not a good idea, because from the SQL side, we declare it as a metadata of part of the table schema when we define the DDL. Although we may explain the DDL internally using computed column, that does not mean we must do that in the DataStream API explicitly. In the SQL world, no projection function outputs type of time-attribute, we better still put the time-attributes in the scope of the table metadata.

Best,
Danny Chan
在 2020年8月19日 +0800 PM4:22,Timo Walther <tw...@apache.org>,写道:
> Hi everyone,
>
> I would like to propose a FLIP that aims to resolve the remaining
> shortcomings in the Table API:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
> The Table API has received many new features over the last year. It
> supports a new type system (FLIP-37), connectors support changelogs
> (FLIP-95), we have well defined internal data structures (FLIP-95),
> support for result retrieval in an interactive fashion (FLIP-84), and
> soon new TableDescriptors (FLIP-129).
>
> However, the interfaces from and to DataStream API have not been touched
> during the introduction of these new features and are kind of outdated.
> The interfaces lack important functionality that is available in Table
> API but not exposed to DataStream API users. DataStream API is still our
> most important API which is why a good interoperability is crucial.
>
> This FLIP is a mixture of different topics that improve the
> interoperability between DataStream and Table API in terms of:
>
> - DataStream <-> Table conversion
> - translation of type systems TypeInformation <-> DataType
> - schema definition (incl. rowtime, watermarks, primary key)
> - changelog handling
> - row handling in DataStream API
>
> I'm looking forward to your feedback.
>
> Regards,
> Timo

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Jingsong raised a good point. We need to be more careful when deprecating
APIs.
For example, tEnv#createTemporaryView was introduced in release-1.10, users
became familiar with this API in the previous release, but now we want to
deprecate it in the next release.

I also have some concerns about deprecating `.rowtime()`, `.proctime()`. I
agree it's a misunderstanding to apply expressions on non-exist fields.
However, these APIs have been introduced since the early time of Table API
and have been highly used.
So I think the misunderstanding shouldn't be a big problem, users have
already accepted it, and the `.rowtime()` and `.proctime()` is a more
fluent API.
Dropping a highly used API and educating users to learn a new one will hurt
users a lot. Could we keep the old API and introduce the new one (which is
the advanced one)?

In a word, I'm +1 to keep the `fromDataStream` which is more
straightforward than `fromInsertStream` for batch users and most streaming
users.
Besides, if we want to have a corresponding thing in the sink side, maybe
we can have `toDataStream` and deprecate `toAppendStream`.

Best,
Jark

On Wed, 2 Sep 2020 at 11:55, Jingsong Li <ji...@gmail.com> wrote:

> Thanks Timo for driving.
>
> My first impression is, can we not deprecate these API?
> - StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> - StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...):
> Table
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>,
> Expression...): Unit
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit
> - StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz):
> DataStream<T>
> - StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T>
> typeInfo): DataStream<T>
>
> I think they may also be commonly used APIs. My intuitive feeling is that
> the API of the table layer is changing too fast, and there are a lot of
> changes in each version. Even if there is a "deprecated", they will be
> removed one day. We can avoid the change unless there's a strong reason.
>
> 1.fromDataStream VS fromInsertStream:
> - In big data systems, or in our previous designs, APIs, including DDL, the
> default is pure insert. Subsequent CDC, upsert, and delete are all
> supplementary extension capabilities. Therefore, by default, it is insert,
> which is known and familiar to users. So I think, "fromDataStream" can be
> as it is.
>
> 2.toAppendStream VS toInsertStream:
> - What is the difference between append and insert? I don't think there is
> a clear distinction between them in our daily discussions. For me, Append
> is OK.
>
> 3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused
> further misunderstandings
> - This API is also widely used, even in our test code. Although we have
> introduced DDL, our test code has not been switched.
> - exist caused further misunderstandings: Can we remove this
> misunderstanding by modifying behavior? For example, duplicate names are
> not allowed. As far as I know, a new column name is the most widely used.
>
> 4.toAppendStream(Table table, Class<T>/TypeInformation)
> - I know a AbstractDataType is more powerful, but I think a simple class
> or TypeInformation is easier to be accepted by DataStream users, simpler,
> they had a chance to not take care of datatype.
>
> I don't have a strong opinion on these, but I feel it's best not to have an
> impact on non-CDC users.
>
> Best,
> Jingsong
>
> On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <tw...@apache.org> wrote:
>
> > Thanks for the healthy discussion Jark and Dawid.
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > Yes, I'm concerned about about the per-record performance. A converter
> > or serializer should prepare an immutable Map instance before (stored in
> > a member variable) that is simply passed to every new Row instance.
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> >
> > The accumulator code in the FLIP is just an example, sure in this
> > example we could use a POJO. But in general it should also be easy for
> > DataStream API users to quickly create a Row and use names instead of
> > indices for code readability.
> >
> > I think we should not add to much validation to the setters to keep the
> > runtime overhead low.
> >
> > Users should not mix position-based and string-based setters if they
> > construct rows themselves. If they do, the result depends on the calling
> > order. IMHO this should be straight forward once the concept is clear.
> >
> > Row row = new Row(2);
> > row.setField(0, "ABC"); // always position 0
> > row.setField(1, "ABC"); // always position 1
> > row.setField("f1", "ABC"); // position 0 because first usage of "f1"
> > row.setField("f0", "ABC"); // position 1 because first usage of "f0"
> > row.setField("f1", "ABC"); // position 0 because second usage of "f1"
> >
> > Row row = new Row(2);
> > row.setField("f0", "ABC"); // position 0 because first usage of "f0"
> > row.setField(0, "ABC");    // always position 0
> >
> > Row row = new Row(2, fieldNames);
> > row.setField(0, "ABC"); // always position 0
> > row.setField("f1", "ABC"); // position defined by fieldNames
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 14:51, Jark Wu wrote:
> > > Hi Timo,
> > >
> > > Thanks for the quick response.
> > >
> > > 5. "StreamStatementSet#attachToStream()"
> > > Joining or using connect() with a different DataStream is a good case.
> > > cc @Godfrey , what do you think about the `attachToStream()` API?
> > >
> > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >> We need a Map for constant time of mapping field name to index.
> > > But we can easily build the Map from the List<String> fieldNames in Row
> > > constructor.
> > > IMO, manually building the Map and mapping names to indices is verbose
> > and
> > > error-prone.
> > > Are you concerned about the per-record performance?
> > >
> > > 7. "a Row has two modes represented by an internal boolean flag
> > > `hasFieldOrder`."
> > > Thanks for the explanation.
> > > Regarding the case (b), I have the same confusion with Dawid that
> what's
> > > the result when index-based setters and name-based setters are mixed
> used
> > > (esp. in foreach and if branches).
> > > TBH, I don't see a strong need for named setters. Using it as the UDAF
> > > accumulator is not as good as POJO in terms of performance and ease of
> > use.
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I really like the ideas of this FLIP. I think it improves user
> > >> experience quite a bit. I wanted to add just two comments:
> > >>
> > >> 1. As for the StatementSet I like the approach described in the FLIP
> for
> > >> its simplicity. Moreover the way I see it is that if a user wants to
> > >> work with DataStream, then he/she wants to end up in the DataStream
> API,
> > >> or in other words call the StreamExecutionEnvironment#execute.
> > >>
> > >> 2. @Timo What is the interaction between Row setters from the
> different
> > >> modes? What happens if the user calls both in different order. E.g.
> > >>
> > >> row.setField(0, "ABC");
> > >>
> > >> row.setField("f0", "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setField("f0", "ABC");
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setFieldNames(...);
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> Best,
> > >>
> > >> Dawid
> > >>
> > >> On 01/09/2020 11:49, Timo Walther wrote:
> > >>> Hi Jark,
> > >>>
> > >>> thanks for the detailed review. Let me answer your concerns:
> > >>>
> > >>> ## Conversion of DataStream to Table
> > >>>
> > >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>> leaf of a QueryOperation tree in the validation phase."
> > >>> I'm fine with allowing `system_proctime` everywhere in the query.
> Also
> > >>> for SQL, I think we should have done that earlier already to give
> > >>> users the chance to have time based operations also at later stages.
> > >>>
> > >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>> assigned implicitly. "
> > >>> Yes, we just use the DataStream API watermark. `system_rowtime()`
> will
> > >>> just introduce a time attribute, the watermark travels to the Table
> > >>> API and into DataStream API without further code changes.
> > >>>
> > >>> ## Conversion of Table to DataStream
> > >>>
> > >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>> DataStream<Row>"
> > >>> 4. "Table.execute(ChangelogMode)"
> > >>> Filtering UPDATE_BEFORE is already quite important as it reduces the
> > >>> amount of data by factor 2. But I also understand your concerns
> > >>> regarding confusing users. I also got the request for a
> > >>> `Table.getChangelogMode()` a couple of times in the past, because
> > >>> users would like to get information about the kind of query that is
> > >>> executed. However, in this case `toChangelogStream(Table)` is
> > >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > >>> Table)` so we don't need `Table.getChangelogMode()` in the current
> > >>> FLIP design. But this can be future work. Let's start with
> > >>> `toChangelogStream(Table)` and wait for more feedback about this new
> > >>> feature. What do others think?
> > >>>
> > >>> ## Conversion of StatementSet to DataStream API
> > >>>
> > >>> 5. "StreamStatementSet#attachToStream()"
> > >>>
> > >>> I think Godfrey's proposal is too complex for regular users. Instead
> > >>> of continuing with the fluent programming, we would force users to
> > >>> define a DataStream pipeline in a lambda.
> > >>>
> > >>> Furthermore, joining or using connect() with a different DataStream
> > >>> source would not be possible in this design.
> > >>>
> > >>> The `execute()` method of `StatementSet` should not execute the
> > >>> DataStream API subprogram. It mixes the concepts because we tell
> > >>> users: "If you use toDataStream" you need to use
> > >>> `StreamExecutionEnvironment.execute()`.
> > >>>
> > >>> We don't solve every potential use case with the current FLIP design
> > >>> but the most important one where a pipeline just uses an INSERT INTO
> > >>> but also uses Table API for connectors and preprocessing and does the
> > >>> main logic in DataStream API:
> > >>>
> > >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
> > >>>
> > >>> I would consider `StatementSet.addDataStream(Table, ...)` future work
> > >>> for now as it is only an opimization for reusing parts of the
> > >>> StreamGraph. We could even perform this optimization when calling
> > >>> `toInsertStream` or `toChangelogStream`.
> > >>>
> > >>> ## Improve dealing with Row in DataStream API
> > >>>
> > >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>
> > >>> We need a Map for constant time of mapping field name to index.
> > >>>
> > >>> We accept a nullable `fieldNames` because names are not mandatory,
> one
> > >>> can also work with indices as before.
> > >>>
> > >>> But you are right that the fieldNames member variable can be
> > >>> immutable. I just wanted to avoid too many overloaded constructors.
> > >>> I'm fine with having one full constructor for RowKind, arity and
> field
> > >>> names (or null).
> > >>>
> > >>> 7. "a Row has two modes represented by an internal boolean flag
> > >>> `hasFieldOrder`."
> > >>> Maybe I leaked to many implementation details there that rather
> > >>> confuse readers than help. Internally, we need to distinguish between
> > >>> two kinds of rows. A user should not be bothered by this.
> > >>>
> > >>> a) Row comes from Table API runtime: hasFieldOrder = true
> > >>> Map("myAge" -> 0, "myName" -> 1)
> > >>>
> > >>> row.getField("myName") == row.getField(1)
> > >>> row.getField("myAge") == row.getField(0)
> > >>>
> > >>> b) Row comes from user: hasFieldOrder = false
> > >>> Row row = new Row(2);
> > >>> row.setField("myName", "Alice");
> > >>> row.setField("myAge", 32);
> > >>>
> > >>> Map("myAge" -> 1, "myName" -> 0)
> > >>>
> > >>> But the type information will decide about the order of the fields
> > >>> later and reorder them accordingly during serialization or RowData
> > >>> conversion:
> > >>>
> > >>> ["myName", "myAge"] vs. ["myAge", "myName"]
> > >>>
> > >>> The user must not care about this as it always feels naturally to
> deal
> > >>> with the rows.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>>
> > >>> On 01.09.20 06:19, Jark Wu wrote:
> > >>>> Hi Timo,
> > >>>>
> > >>>> Thanks a lot for the great proposal and sorry for the late reply.
> > >>>> This is
> > >>>> an important improvement for DataStream and Table API users.
> > >>>>
> > >>>> I have listed my thoughts and questions below ;-)
> > >>>>
> > >>>> ## Conversion of DataStream to Table
> > >>>>
> > >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>>> leaf of
> > >>>> a QueryOperation tree in the validation phase."
> > >>>> IIUC, that means `system_rowtime()` can only be used in the first
> > >>>> `select()` after `fromXxxStream()`, right?
> > >>>> However, I think `system_proctime()` shouldn't have this limitation,
> > >>>> because it doesn't rely on the underlying timestamp of StreamRecord
> > and
> > >>>>    can be generated in any stage of the query.
> > >>>>
> > >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>>> assigned implicitly. "
> > >>>> What watermark will be used here? Is the pre-assigned watermark in
> the
> > >>>> DataStream (so called `system_watermak()`)?
> > >>>>
> > >>>> ## Conversion of Table to DataStream
> > >>>>
> > >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>>> DataStream<Row>"
> > >>>> I'm not sure whether this method is useful for users. Currently, the
> > >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> > >>>> filtering UPDATE_BEFORE if possible.
> > >>>> However, if we expose this method to users, it may be confusing.
> > >>>> Users may
> > >>>> try to use this method to convert a changelog stream to an
> insert-only
> > >>>> stream by applying ChangelogMode.insertOnly(). This might be
> > misleading.
> > >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> > >>>> have
> > >>>> to know the ChangelogMode of the current Table first, and remove
> > >>>> UPDATE_BEFORE from the ChagnelogMode.
> > >>>> That means we have to support `Table.getChangelogMode()` first? But
> > >>>> `ChangelogMode` derivation requires a full optimization path on the
> > >>>> Table,
> > >>>> which seems impossible now.
> > >>>> Therefore, IMHO, we can introduce this interface in the future if
> > users
> > >>>> indeed need this. For most users, I think `toChangelogStream(Table)`
> > is
> > >>>> enough.
> > >>>>
> > >>>> 4. "Table.execute(ChangelogMode)"
> > >>>> Ditto.
> > >>>>
> > >>>> ## Conversion of StatementSet to DataStream API
> > >>>>
> > >>>> 5. "StreamStatementSet#attachToStream()"
> > >>>> I think the potential drawback is that it can't support multi-sink
> > >>>> optimization, i.e. share pipeline.
> > >>>> For example, if we have a Table `t1` (a heavy view uses join,
> > >>>> aggregate),
> > >>>> and want to sink to "mysql" using SQL and want to continue
> processing
> > >>>> using
> > >>>> DataStream in a job.
> > >>>> It's a huge waste of resources if we re-compute `t1`. It would be
> > >>>> nice if
> > >>>> we can come up with a solution to share the pipeline.
> > >>>>
> > >>>> I borrowed Godfrey's idea in FLINK-18840 and added some
> > >>>> modifications. What
> > >>>> do you think about the following proposal?
> > >>>>
> > >>>> interface StatementSet {
> > >>>>      StatementSet addDataStream(Table table,
> TableDataStreamTransform
> > >>>> transform);
> > >>>> }
> > >>>>
> > >>>> interface TableDataStreamTransform {
> > >>>>      void transform(Context);
> > >>>>
> > >>>>      interface Context {
> > >>>>          Table getTable();
> > >>>>          DataStream<Row> toInsertStream(Table);
> > >>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> > >>>>          DataStream<Row> toChangelogStream(Table);
> > >>>>      }
> > >>>> }
> > >>>>
> > >>>> tEnv
> > >>>>     .createStatementSet()
> > >>>>     .addInsert("mysql", table1)
> > >>>>     .addDataStream(table1, ctx -> {
> > >>>>         ctx.toInsertStream(ctx.getTable())
> > >>>>           .flatmap(..)
> > >>>>           .keyBy(..)
> > >>>>           .process(..)
> > >>>>           .addSink(...);
> > >>>>     })
> > >>>>
> > >>>>
> > >>>> ## Improve dealing with Row in DataStream API
> > >>>>
> > >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter
> > is
> > >>>> enough and more handy than Map ?
> > >>>> - Currently, the fieldNames member variable is mutable, is it on
> > >>>> purpose?
> > >>>> Can we make it immutable? For example, only accept from the
> > constructor.
> > >>>> - Why do we accept a nullable `fieldNames`?
> > >>>>
> > >>>> 7. "a Row has two modes represented by an internal boolean flag
> > >>>> `hasFieldOrder`."
> > >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> > >>>> and is
> > >>>> used for. Could you explain a bit more for this?
> > >>>>
> > >>>> Best,
> > >>>> Jark
> > >>>>
> > >>>>
> > >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org>
> > wrote:
> > >>>>
> > >>>>> Hi David,
> > >>>>>
> > >>>>> thanks for your feedback. Feedback from someone who interacts with
> > many
> > >>>>> users is very valuable. I added an explanation for StatementSets to
> > the
> > >>>>> FLIP.
> > >>>>>
> > >>>>> Regarding watermarks and fromInsertStream, actually the
> > >>>>>
> > >>>>> `Schema.watermark("ts", system_watermark())`
> > >>>>>
> > >>>>> is not really necessary in the `fromChangelogStream`. It is added
> to
> > >>>>> satify the Schema interface and be similar to SQL DDL.
> > >>>>>
> > >>>>> We could already extract the watermark strategy if we see
> > >>>>> `system_rowtime()` because in most of the cases we will simply use
> > the
> > >>>>> DataStream API watermarks.
> > >>>>>
> > >>>>> But maybe some users want to generate watermarks after
> preprocessing
> > in
> > >>>>> DataStream API. In this cases users what to define a computed
> > watermark
> > >>>>> expression.
> > >>>>>
> > >>>>> So for simplicity in the Simple API we introduce:
> > >>>>>
> > >>>>> tEnv
> > >>>>>      .fromInsertStream(DataStream<T>)
> > >>>>>      .select('*, system_rowtime().as("rowtime"),
> > >>>>> system_proctime().as("proctime"))
> > >>>>>
> > >>>>> and just rely on the watermarks that travel through DataStream API
> > >>>>> already. I added another comment to the FLIP.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Timo
> > >>>>>
> > >>>>>
> > >>>>> On 19.08.20 10:53, David Anderson wrote:
> > >>>>>> Timo, nice to see this.
> > >>>>>>
> > >>>>>> As someone who expects to use these interfaces, but who doesn't
> > fully
> > >>>>>> understand the existing Table API, I like what I see. Just a
> couple
> > of
> > >>>>>> comments:
> > >>>>>>
> > >>>>>> The way that watermarks fit into the fromChangelogStream case
> makes
> > >>>>>> sense
> > >>>>>> to me, and I'm wondering why watermarks don't come up in the
> > previous
> > >>>>>> section about fromInsertStream.
> > >>>>>>
> > >>>>>> I wasn't familiar with StatementSets, and I couldn't find an
> > >>>>>> explanation
> > >>>>> in
> > >>>>>> the docs. I eventually found this short paragraph in an email from
> > >>>>>> Fabian
> > >>>>>> Hueske, which clarified everything in that section for me:
> > >>>>>>
> > >>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
> > >>>>>> multiple
> > >>>>>> INSERT
> > >>>>>>        INTO statements (SQL or Table API) together. The statements
> > in a
> > >>>>>> statement
> > >>>>>>        set are jointly optimized and executed as a single Flink
> job.
> > >>>>>>
> > >>>>>> Maybe if you add this to the FLIP it will help other readers as
> > well.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> David
> > >>>>>>
> > >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twalthr@apache.org
> >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> I would like to propose a FLIP that aims to resolve the remaining
> > >>>>>>> shortcomings in the Table API:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > >>>>>
> > >>>>>>>
> > >>>>>>> The Table API has received many new features over the last year.
> It
> > >>>>>>> supports a new type system (FLIP-37), connectors support
> changelogs
> > >>>>>>> (FLIP-95), we have well defined internal data structures
> (FLIP-95),
> > >>>>>>> support for result retrieval in an interactive fashion (FLIP-84),
> > and
> > >>>>>>> soon new TableDescriptors (FLIP-129).
> > >>>>>>>
> > >>>>>>> However, the interfaces from and to DataStream API have not been
> > >>>>>>> touched
> > >>>>>>> during the introduction of these new features and are kind of
> > >>>>>>> outdated.
> > >>>>>>> The interfaces lack important functionality that is available in
> > >>>>>>> Table
> > >>>>>>> API but not exposed to DataStream API users. DataStream API is
> > >>>>>>> still our
> > >>>>>>> most important API which is why a good interoperability is
> crucial.
> > >>>>>>>
> > >>>>>>> This FLIP is a mixture of different topics that improve the
> > >>>>>>> interoperability between DataStream and Table API in terms of:
> > >>>>>>>
> > >>>>>>> - DataStream <-> Table conversion
> > >>>>>>> - translation of type systems TypeInformation <-> DataType
> > >>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
> > >>>>>>> - changelog handling
> > >>>>>>> - row handling in DataStream API
> > >>>>>>>
> > >>>>>>> I'm looking forward to your feedback.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Timo for driving.

My first impression is, can we not deprecate these API?
- StreamTableEnvironment.fromDataStream(DataStream<T>): Table
- StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...):
Table
- StreamTableEnvironment.createTemporaryView(String, DataStream<T>,
Expression...): Unit
- StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit
- StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz):
DataStream<T>
- StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T>
typeInfo): DataStream<T>

I think they may also be commonly used APIs. My intuitive feeling is that
the API of the table layer is changing too fast, and there are a lot of
changes in each version. Even if there is a "deprecated", they will be
removed one day. We can avoid the change unless there's a strong reason.

1.fromDataStream VS fromInsertStream:
- In big data systems, or in our previous designs, APIs, including DDL, the
default is pure insert. Subsequent CDC, upsert, and delete are all
supplementary extension capabilities. Therefore, by default, it is insert,
which is known and familiar to users. So I think, "fromDataStream" can be
as it is.

2.toAppendStream VS toInsertStream:
- What is the difference between append and insert? I don't think there is
a clear distinction between them in our daily discussions. For me, Append
is OK.

3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused
further misunderstandings
- This API is also widely used, even in our test code. Although we have
introduced DDL, our test code has not been switched.
- exist caused further misunderstandings: Can we remove this
misunderstanding by modifying behavior? For example, duplicate names are
not allowed. As far as I know, a new column name is the most widely used.

4.toAppendStream(Table table, Class<T>/TypeInformation)
- I know a AbstractDataType is more powerful, but I think a simple class
or TypeInformation is easier to be accepted by DataStream users, simpler,
they had a chance to not take care of datatype.

I don't have a strong opinion on these, but I feel it's best not to have an
impact on non-CDC users.

Best,
Jingsong

On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <tw...@apache.org> wrote:

> Thanks for the healthy discussion Jark and Dawid.
>
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>
> Yes, I'm concerned about about the per-record performance. A converter
> or serializer should prepare an immutable Map instance before (stored in
> a member variable) that is simply passed to every new Row instance.
>
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
>
> The accumulator code in the FLIP is just an example, sure in this
> example we could use a POJO. But in general it should also be easy for
> DataStream API users to quickly create a Row and use names instead of
> indices for code readability.
>
> I think we should not add to much validation to the setters to keep the
> runtime overhead low.
>
> Users should not mix position-based and string-based setters if they
> construct rows themselves. If they do, the result depends on the calling
> order. IMHO this should be straight forward once the concept is clear.
>
> Row row = new Row(2);
> row.setField(0, "ABC"); // always position 0
> row.setField(1, "ABC"); // always position 1
> row.setField("f1", "ABC"); // position 0 because first usage of "f1"
> row.setField("f0", "ABC"); // position 1 because first usage of "f0"
> row.setField("f1", "ABC"); // position 0 because second usage of "f1"
>
> Row row = new Row(2);
> row.setField("f0", "ABC"); // position 0 because first usage of "f0"
> row.setField(0, "ABC");    // always position 0
>
> Row row = new Row(2, fieldNames);
> row.setField(0, "ABC"); // always position 0
> row.setField("f1", "ABC"); // position defined by fieldNames
>
> Regards,
> Timo
>
> On 01.09.20 14:51, Jark Wu wrote:
> > Hi Timo,
> >
> > Thanks for the quick response.
> >
> > 5. "StreamStatementSet#attachToStream()"
> > Joining or using connect() with a different DataStream is a good case.
> > cc @Godfrey , what do you think about the `attachToStream()` API?
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >> We need a Map for constant time of mapping field name to index.
> > But we can easily build the Map from the List<String> fieldNames in Row
> > constructor.
> > IMO, manually building the Map and mapping names to indices is verbose
> and
> > error-prone.
> > Are you concerned about the per-record performance?
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> > Thanks for the explanation.
> > Regarding the case (b), I have the same confusion with Dawid that what's
> > the result when index-based setters and name-based setters are mixed used
> > (esp. in foreach and if branches).
> > TBH, I don't see a strong need for named setters. Using it as the UDAF
> > accumulator is not as good as POJO in terms of performance and ease of
> use.
> >
> > Best,
> > Jark
> >
> > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> > wrote:
> >
> >> Hi all,
> >>
> >> I really like the ideas of this FLIP. I think it improves user
> >> experience quite a bit. I wanted to add just two comments:
> >>
> >> 1. As for the StatementSet I like the approach described in the FLIP for
> >> its simplicity. Moreover the way I see it is that if a user wants to
> >> work with DataStream, then he/she wants to end up in the DataStream API,
> >> or in other words call the StreamExecutionEnvironment#execute.
> >>
> >> 2. @Timo What is the interaction between Row setters from the different
> >> modes? What happens if the user calls both in different order. E.g.
> >>
> >> row.setField(0, "ABC");
> >>
> >> row.setField("f0", "ABC"); // is this a valid call ?
> >>
> >> or
> >>
> >> row.setField("f0", "ABC");
> >>
> >> row.setField(0, "ABC"); // is this a valid call ?
> >>
> >> or
> >>
> >> row.setFieldNames(...);
> >>
> >> row.setField(0, "ABC"); // is this a valid call ?
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 01/09/2020 11:49, Timo Walther wrote:
> >>> Hi Jark,
> >>>
> >>> thanks for the detailed review. Let me answer your concerns:
> >>>
> >>> ## Conversion of DataStream to Table
> >>>
> >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >>> leaf of a QueryOperation tree in the validation phase."
> >>> I'm fine with allowing `system_proctime` everywhere in the query. Also
> >>> for SQL, I think we should have done that earlier already to give
> >>> users the chance to have time based operations also at later stages.
> >>>
> >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >>> assigned implicitly. "
> >>> Yes, we just use the DataStream API watermark. `system_rowtime()` will
> >>> just introduce a time attribute, the watermark travels to the Table
> >>> API and into DataStream API without further code changes.
> >>>
> >>> ## Conversion of Table to DataStream
> >>>
> >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >>> DataStream<Row>"
> >>> 4. "Table.execute(ChangelogMode)"
> >>> Filtering UPDATE_BEFORE is already quite important as it reduces the
> >>> amount of data by factor 2. But I also understand your concerns
> >>> regarding confusing users. I also got the request for a
> >>> `Table.getChangelogMode()` a couple of times in the past, because
> >>> users would like to get information about the kind of query that is
> >>> executed. However, in this case `toChangelogStream(Table)` is
> >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> >>> Table)` so we don't need `Table.getChangelogMode()` in the current
> >>> FLIP design. But this can be future work. Let's start with
> >>> `toChangelogStream(Table)` and wait for more feedback about this new
> >>> feature. What do others think?
> >>>
> >>> ## Conversion of StatementSet to DataStream API
> >>>
> >>> 5. "StreamStatementSet#attachToStream()"
> >>>
> >>> I think Godfrey's proposal is too complex for regular users. Instead
> >>> of continuing with the fluent programming, we would force users to
> >>> define a DataStream pipeline in a lambda.
> >>>
> >>> Furthermore, joining or using connect() with a different DataStream
> >>> source would not be possible in this design.
> >>>
> >>> The `execute()` method of `StatementSet` should not execute the
> >>> DataStream API subprogram. It mixes the concepts because we tell
> >>> users: "If you use toDataStream" you need to use
> >>> `StreamExecutionEnvironment.execute()`.
> >>>
> >>> We don't solve every potential use case with the current FLIP design
> >>> but the most important one where a pipeline just uses an INSERT INTO
> >>> but also uses Table API for connectors and preprocessing and does the
> >>> main logic in DataStream API:
> >>>
> >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
> >>>
> >>> I would consider `StatementSet.addDataStream(Table, ...)` future work
> >>> for now as it is only an opimization for reusing parts of the
> >>> StreamGraph. We could even perform this optimization when calling
> >>> `toInsertStream` or `toChangelogStream`.
> >>>
> >>> ## Improve dealing with Row in DataStream API
> >>>
> >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >>>
> >>> We need a Map for constant time of mapping field name to index.
> >>>
> >>> We accept a nullable `fieldNames` because names are not mandatory, one
> >>> can also work with indices as before.
> >>>
> >>> But you are right that the fieldNames member variable can be
> >>> immutable. I just wanted to avoid too many overloaded constructors.
> >>> I'm fine with having one full constructor for RowKind, arity and field
> >>> names (or null).
> >>>
> >>> 7. "a Row has two modes represented by an internal boolean flag
> >>> `hasFieldOrder`."
> >>> Maybe I leaked to many implementation details there that rather
> >>> confuse readers than help. Internally, we need to distinguish between
> >>> two kinds of rows. A user should not be bothered by this.
> >>>
> >>> a) Row comes from Table API runtime: hasFieldOrder = true
> >>> Map("myAge" -> 0, "myName" -> 1)
> >>>
> >>> row.getField("myName") == row.getField(1)
> >>> row.getField("myAge") == row.getField(0)
> >>>
> >>> b) Row comes from user: hasFieldOrder = false
> >>> Row row = new Row(2);
> >>> row.setField("myName", "Alice");
> >>> row.setField("myAge", 32);
> >>>
> >>> Map("myAge" -> 1, "myName" -> 0)
> >>>
> >>> But the type information will decide about the order of the fields
> >>> later and reorder them accordingly during serialization or RowData
> >>> conversion:
> >>>
> >>> ["myName", "myAge"] vs. ["myAge", "myName"]
> >>>
> >>> The user must not care about this as it always feels naturally to deal
> >>> with the rows.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 01.09.20 06:19, Jark Wu wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Thanks a lot for the great proposal and sorry for the late reply.
> >>>> This is
> >>>> an important improvement for DataStream and Table API users.
> >>>>
> >>>> I have listed my thoughts and questions below ;-)
> >>>>
> >>>> ## Conversion of DataStream to Table
> >>>>
> >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >>>> leaf of
> >>>> a QueryOperation tree in the validation phase."
> >>>> IIUC, that means `system_rowtime()` can only be used in the first
> >>>> `select()` after `fromXxxStream()`, right?
> >>>> However, I think `system_proctime()` shouldn't have this limitation,
> >>>> because it doesn't rely on the underlying timestamp of StreamRecord
> and
> >>>>    can be generated in any stage of the query.
> >>>>
> >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >>>> assigned implicitly. "
> >>>> What watermark will be used here? Is the pre-assigned watermark in the
> >>>> DataStream (so called `system_watermak()`)?
> >>>>
> >>>> ## Conversion of Table to DataStream
> >>>>
> >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >>>> DataStream<Row>"
> >>>> I'm not sure whether this method is useful for users. Currently, the
> >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> >>>> filtering UPDATE_BEFORE if possible.
> >>>> However, if we expose this method to users, it may be confusing.
> >>>> Users may
> >>>> try to use this method to convert a changelog stream to an insert-only
> >>>> stream by applying ChangelogMode.insertOnly(). This might be
> misleading.
> >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> >>>> have
> >>>> to know the ChangelogMode of the current Table first, and remove
> >>>> UPDATE_BEFORE from the ChagnelogMode.
> >>>> That means we have to support `Table.getChangelogMode()` first? But
> >>>> `ChangelogMode` derivation requires a full optimization path on the
> >>>> Table,
> >>>> which seems impossible now.
> >>>> Therefore, IMHO, we can introduce this interface in the future if
> users
> >>>> indeed need this. For most users, I think `toChangelogStream(Table)`
> is
> >>>> enough.
> >>>>
> >>>> 4. "Table.execute(ChangelogMode)"
> >>>> Ditto.
> >>>>
> >>>> ## Conversion of StatementSet to DataStream API
> >>>>
> >>>> 5. "StreamStatementSet#attachToStream()"
> >>>> I think the potential drawback is that it can't support multi-sink
> >>>> optimization, i.e. share pipeline.
> >>>> For example, if we have a Table `t1` (a heavy view uses join,
> >>>> aggregate),
> >>>> and want to sink to "mysql" using SQL and want to continue processing
> >>>> using
> >>>> DataStream in a job.
> >>>> It's a huge waste of resources if we re-compute `t1`. It would be
> >>>> nice if
> >>>> we can come up with a solution to share the pipeline.
> >>>>
> >>>> I borrowed Godfrey's idea in FLINK-18840 and added some
> >>>> modifications. What
> >>>> do you think about the following proposal?
> >>>>
> >>>> interface StatementSet {
> >>>>      StatementSet addDataStream(Table table, TableDataStreamTransform
> >>>> transform);
> >>>> }
> >>>>
> >>>> interface TableDataStreamTransform {
> >>>>      void transform(Context);
> >>>>
> >>>>      interface Context {
> >>>>          Table getTable();
> >>>>          DataStream<Row> toInsertStream(Table);
> >>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> >>>>          DataStream<Row> toChangelogStream(Table);
> >>>>      }
> >>>> }
> >>>>
> >>>> tEnv
> >>>>     .createStatementSet()
> >>>>     .addInsert("mysql", table1)
> >>>>     .addDataStream(table1, ctx -> {
> >>>>         ctx.toInsertStream(ctx.getTable())
> >>>>           .flatmap(..)
> >>>>           .keyBy(..)
> >>>>           .process(..)
> >>>>           .addSink(...);
> >>>>     })
> >>>>
> >>>>
> >>>> ## Improve dealing with Row in DataStream API
> >>>>
> >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter
> is
> >>>> enough and more handy than Map ?
> >>>> - Currently, the fieldNames member variable is mutable, is it on
> >>>> purpose?
> >>>> Can we make it immutable? For example, only accept from the
> constructor.
> >>>> - Why do we accept a nullable `fieldNames`?
> >>>>
> >>>> 7. "a Row has two modes represented by an internal boolean flag
> >>>> `hasFieldOrder`."
> >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> >>>> and is
> >>>> used for. Could you explain a bit more for this?
> >>>>
> >>>> Best,
> >>>> Jark
> >>>>
> >>>>
> >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org>
> wrote:
> >>>>
> >>>>> Hi David,
> >>>>>
> >>>>> thanks for your feedback. Feedback from someone who interacts with
> many
> >>>>> users is very valuable. I added an explanation for StatementSets to
> the
> >>>>> FLIP.
> >>>>>
> >>>>> Regarding watermarks and fromInsertStream, actually the
> >>>>>
> >>>>> `Schema.watermark("ts", system_watermark())`
> >>>>>
> >>>>> is not really necessary in the `fromChangelogStream`. It is added to
> >>>>> satify the Schema interface and be similar to SQL DDL.
> >>>>>
> >>>>> We could already extract the watermark strategy if we see
> >>>>> `system_rowtime()` because in most of the cases we will simply use
> the
> >>>>> DataStream API watermarks.
> >>>>>
> >>>>> But maybe some users want to generate watermarks after preprocessing
> in
> >>>>> DataStream API. In this cases users what to define a computed
> watermark
> >>>>> expression.
> >>>>>
> >>>>> So for simplicity in the Simple API we introduce:
> >>>>>
> >>>>> tEnv
> >>>>>      .fromInsertStream(DataStream<T>)
> >>>>>      .select('*, system_rowtime().as("rowtime"),
> >>>>> system_proctime().as("proctime"))
> >>>>>
> >>>>> and just rely on the watermarks that travel through DataStream API
> >>>>> already. I added another comment to the FLIP.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 19.08.20 10:53, David Anderson wrote:
> >>>>>> Timo, nice to see this.
> >>>>>>
> >>>>>> As someone who expects to use these interfaces, but who doesn't
> fully
> >>>>>> understand the existing Table API, I like what I see. Just a couple
> of
> >>>>>> comments:
> >>>>>>
> >>>>>> The way that watermarks fit into the fromChangelogStream case makes
> >>>>>> sense
> >>>>>> to me, and I'm wondering why watermarks don't come up in the
> previous
> >>>>>> section about fromInsertStream.
> >>>>>>
> >>>>>> I wasn't familiar with StatementSets, and I couldn't find an
> >>>>>> explanation
> >>>>> in
> >>>>>> the docs. I eventually found this short paragraph in an email from
> >>>>>> Fabian
> >>>>>> Hueske, which clarified everything in that section for me:
> >>>>>>
> >>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
> >>>>>> multiple
> >>>>>> INSERT
> >>>>>>        INTO statements (SQL or Table API) together. The statements
> in a
> >>>>>> statement
> >>>>>>        set are jointly optimized and executed as a single Flink job.
> >>>>>>
> >>>>>> Maybe if you add this to the FLIP it will help other readers as
> well.
> >>>>>>
> >>>>>> Best,
> >>>>>> David
> >>>>>>
> >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> I would like to propose a FLIP that aims to resolve the remaining
> >>>>>>> shortcomings in the Table API:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>
> >>>>>>>
> >>>>>>> The Table API has received many new features over the last year. It
> >>>>>>> supports a new type system (FLIP-37), connectors support changelogs
> >>>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>>>>>> support for result retrieval in an interactive fashion (FLIP-84),
> and
> >>>>>>> soon new TableDescriptors (FLIP-129).
> >>>>>>>
> >>>>>>> However, the interfaces from and to DataStream API have not been
> >>>>>>> touched
> >>>>>>> during the introduction of these new features and are kind of
> >>>>>>> outdated.
> >>>>>>> The interfaces lack important functionality that is available in
> >>>>>>> Table
> >>>>>>> API but not exposed to DataStream API users. DataStream API is
> >>>>>>> still our
> >>>>>>> most important API which is why a good interoperability is crucial.
> >>>>>>>
> >>>>>>> This FLIP is a mixture of different topics that improve the
> >>>>>>> interoperability between DataStream and Table API in terms of:
> >>>>>>>
> >>>>>>> - DataStream <-> Table conversion
> >>>>>>> - translation of type systems TypeInformation <-> DataType
> >>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
> >>>>>>> - changelog handling
> >>>>>>> - row handling in DataStream API
> >>>>>>>
> >>>>>>> I'm looking forward to your feedback.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Thanks for the healthy discussion Jark and Dawid.

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"

Yes, I'm concerned about about the per-record performance. A converter 
or serializer should prepare an immutable Map instance before (stored in 
a member variable) that is simply passed to every new Row instance.

7. "a Row has two modes represented by an internal boolean flag 
`hasFieldOrder`."

The accumulator code in the FLIP is just an example, sure in this 
example we could use a POJO. But in general it should also be easy for 
DataStream API users to quickly create a Row and use names instead of 
indices for code readability.

I think we should not add to much validation to the setters to keep the 
runtime overhead low.

Users should not mix position-based and string-based setters if they 
construct rows themselves. If they do, the result depends on the calling 
order. IMHO this should be straight forward once the concept is clear.

Row row = new Row(2);
row.setField(0, "ABC"); // always position 0
row.setField(1, "ABC"); // always position 1
row.setField("f1", "ABC"); // position 0 because first usage of "f1"
row.setField("f0", "ABC"); // position 1 because first usage of "f0"
row.setField("f1", "ABC"); // position 0 because second usage of "f1"

Row row = new Row(2);
row.setField("f0", "ABC"); // position 0 because first usage of "f0"
row.setField(0, "ABC");    // always position 0

Row row = new Row(2, fieldNames);
row.setField(0, "ABC"); // always position 0
row.setField("f1", "ABC"); // position defined by fieldNames

Regards,
Timo

On 01.09.20 14:51, Jark Wu wrote:
> Hi Timo,
> 
> Thanks for the quick response.
> 
> 5. "StreamStatementSet#attachToStream()"
> Joining or using connect() with a different DataStream is a good case.
> cc @Godfrey , what do you think about the `attachToStream()` API?
> 
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>> We need a Map for constant time of mapping field name to index.
> But we can easily build the Map from the List<String> fieldNames in Row
> constructor.
> IMO, manually building the Map and mapping names to indices is verbose and
> error-prone.
> Are you concerned about the per-record performance?
> 
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Thanks for the explanation.
> Regarding the case (b), I have the same confusion with Dawid that what's
> the result when index-based setters and name-based setters are mixed used
> (esp. in foreach and if branches).
> TBH, I don't see a strong need for named setters. Using it as the UDAF
> accumulator is not as good as POJO in terms of performance and ease of use.
> 
> Best,
> Jark
> 
> On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> wrote:
> 
>> Hi all,
>>
>> I really like the ideas of this FLIP. I think it improves user
>> experience quite a bit. I wanted to add just two comments:
>>
>> 1. As for the StatementSet I like the approach described in the FLIP for
>> its simplicity. Moreover the way I see it is that if a user wants to
>> work with DataStream, then he/she wants to end up in the DataStream API,
>> or in other words call the StreamExecutionEnvironment#execute.
>>
>> 2. @Timo What is the interaction between Row setters from the different
>> modes? What happens if the user calls both in different order. E.g.
>>
>> row.setField(0, "ABC");
>>
>> row.setField("f0", "ABC"); // is this a valid call ?
>>
>> or
>>
>> row.setField("f0", "ABC");
>>
>> row.setField(0, "ABC"); // is this a valid call ?
>>
>> or
>>
>> row.setFieldNames(...);
>>
>> row.setField(0, "ABC"); // is this a valid call ?
>>
>> Best,
>>
>> Dawid
>>
>> On 01/09/2020 11:49, Timo Walther wrote:
>>> Hi Jark,
>>>
>>> thanks for the detailed review. Let me answer your concerns:
>>>
>>> ## Conversion of DataStream to Table
>>>
>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>>> leaf of a QueryOperation tree in the validation phase."
>>> I'm fine with allowing `system_proctime` everywhere in the query. Also
>>> for SQL, I think we should have done that earlier already to give
>>> users the chance to have time based operations also at later stages.
>>>
>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>>> assigned implicitly. "
>>> Yes, we just use the DataStream API watermark. `system_rowtime()` will
>>> just introduce a time attribute, the watermark travels to the Table
>>> API and into DataStream API without further code changes.
>>>
>>> ## Conversion of Table to DataStream
>>>
>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>>> DataStream<Row>"
>>> 4. "Table.execute(ChangelogMode)"
>>> Filtering UPDATE_BEFORE is already quite important as it reduces the
>>> amount of data by factor 2. But I also understand your concerns
>>> regarding confusing users. I also got the request for a
>>> `Table.getChangelogMode()` a couple of times in the past, because
>>> users would like to get information about the kind of query that is
>>> executed. However, in this case `toChangelogStream(Table)` is
>>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
>>> Table)` so we don't need `Table.getChangelogMode()` in the current
>>> FLIP design. But this can be future work. Let's start with
>>> `toChangelogStream(Table)` and wait for more feedback about this new
>>> feature. What do others think?
>>>
>>> ## Conversion of StatementSet to DataStream API
>>>
>>> 5. "StreamStatementSet#attachToStream()"
>>>
>>> I think Godfrey's proposal is too complex for regular users. Instead
>>> of continuing with the fluent programming, we would force users to
>>> define a DataStream pipeline in a lambda.
>>>
>>> Furthermore, joining or using connect() with a different DataStream
>>> source would not be possible in this design.
>>>
>>> The `execute()` method of `StatementSet` should not execute the
>>> DataStream API subprogram. It mixes the concepts because we tell
>>> users: "If you use toDataStream" you need to use
>>> `StreamExecutionEnvironment.execute()`.
>>>
>>> We don't solve every potential use case with the current FLIP design
>>> but the most important one where a pipeline just uses an INSERT INTO
>>> but also uses Table API for connectors and preprocessing and does the
>>> main logic in DataStream API:
>>>
>>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
>>>
>>> I would consider `StatementSet.addDataStream(Table, ...)` future work
>>> for now as it is only an opimization for reusing parts of the
>>> StreamGraph. We could even perform this optimization when calling
>>> `toInsertStream` or `toChangelogStream`.
>>>
>>> ## Improve dealing with Row in DataStream API
>>>
>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>>>
>>> We need a Map for constant time of mapping field name to index.
>>>
>>> We accept a nullable `fieldNames` because names are not mandatory, one
>>> can also work with indices as before.
>>>
>>> But you are right that the fieldNames member variable can be
>>> immutable. I just wanted to avoid too many overloaded constructors.
>>> I'm fine with having one full constructor for RowKind, arity and field
>>> names (or null).
>>>
>>> 7. "a Row has two modes represented by an internal boolean flag
>>> `hasFieldOrder`."
>>> Maybe I leaked to many implementation details there that rather
>>> confuse readers than help. Internally, we need to distinguish between
>>> two kinds of rows. A user should not be bothered by this.
>>>
>>> a) Row comes from Table API runtime: hasFieldOrder = true
>>> Map("myAge" -> 0, "myName" -> 1)
>>>
>>> row.getField("myName") == row.getField(1)
>>> row.getField("myAge") == row.getField(0)
>>>
>>> b) Row comes from user: hasFieldOrder = false
>>> Row row = new Row(2);
>>> row.setField("myName", "Alice");
>>> row.setField("myAge", 32);
>>>
>>> Map("myAge" -> 1, "myName" -> 0)
>>>
>>> But the type information will decide about the order of the fields
>>> later and reorder them accordingly during serialization or RowData
>>> conversion:
>>>
>>> ["myName", "myAge"] vs. ["myAge", "myName"]
>>>
>>> The user must not care about this as it always feels naturally to deal
>>> with the rows.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 01.09.20 06:19, Jark Wu wrote:
>>>> Hi Timo,
>>>>
>>>> Thanks a lot for the great proposal and sorry for the late reply.
>>>> This is
>>>> an important improvement for DataStream and Table API users.
>>>>
>>>> I have listed my thoughts and questions below ;-)
>>>>
>>>> ## Conversion of DataStream to Table
>>>>
>>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>>>> leaf of
>>>> a QueryOperation tree in the validation phase."
>>>> IIUC, that means `system_rowtime()` can only be used in the first
>>>> `select()` after `fromXxxStream()`, right?
>>>> However, I think `system_proctime()` shouldn't have this limitation,
>>>> because it doesn't rely on the underlying timestamp of StreamRecord and
>>>>    can be generated in any stage of the query.
>>>>
>>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>>>> assigned implicitly. "
>>>> What watermark will be used here? Is the pre-assigned watermark in the
>>>> DataStream (so called `system_watermak()`)?
>>>>
>>>> ## Conversion of Table to DataStream
>>>>
>>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>>>> DataStream<Row>"
>>>> I'm not sure whether this method is useful for users. Currently, the
>>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
>>>> filtering UPDATE_BEFORE if possible.
>>>> However, if we expose this method to users, it may be confusing.
>>>> Users may
>>>> try to use this method to convert a changelog stream to an insert-only
>>>> stream by applying ChangelogMode.insertOnly(). This might be misleading.
>>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
>>>> have
>>>> to know the ChangelogMode of the current Table first, and remove
>>>> UPDATE_BEFORE from the ChagnelogMode.
>>>> That means we have to support `Table.getChangelogMode()` first? But
>>>> `ChangelogMode` derivation requires a full optimization path on the
>>>> Table,
>>>> which seems impossible now.
>>>> Therefore, IMHO, we can introduce this interface in the future if users
>>>> indeed need this. For most users, I think `toChangelogStream(Table)` is
>>>> enough.
>>>>
>>>> 4. "Table.execute(ChangelogMode)"
>>>> Ditto.
>>>>
>>>> ## Conversion of StatementSet to DataStream API
>>>>
>>>> 5. "StreamStatementSet#attachToStream()"
>>>> I think the potential drawback is that it can't support multi-sink
>>>> optimization, i.e. share pipeline.
>>>> For example, if we have a Table `t1` (a heavy view uses join,
>>>> aggregate),
>>>> and want to sink to "mysql" using SQL and want to continue processing
>>>> using
>>>> DataStream in a job.
>>>> It's a huge waste of resources if we re-compute `t1`. It would be
>>>> nice if
>>>> we can come up with a solution to share the pipeline.
>>>>
>>>> I borrowed Godfrey's idea in FLINK-18840 and added some
>>>> modifications. What
>>>> do you think about the following proposal?
>>>>
>>>> interface StatementSet {
>>>>      StatementSet addDataStream(Table table, TableDataStreamTransform
>>>> transform);
>>>> }
>>>>
>>>> interface TableDataStreamTransform {
>>>>      void transform(Context);
>>>>
>>>>      interface Context {
>>>>          Table getTable();
>>>>          DataStream<Row> toInsertStream(Table);
>>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>>>>          DataStream<Row> toChangelogStream(Table);
>>>>      }
>>>> }
>>>>
>>>> tEnv
>>>>     .createStatementSet()
>>>>     .addInsert("mysql", table1)
>>>>     .addDataStream(table1, ctx -> {
>>>>         ctx.toInsertStream(ctx.getTable())
>>>>           .flatmap(..)
>>>>           .keyBy(..)
>>>>           .process(..)
>>>>           .addSink(...);
>>>>     })
>>>>
>>>>
>>>> ## Improve dealing with Row in DataStream API
>>>>
>>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
>>>> enough and more handy than Map ?
>>>> - Currently, the fieldNames member variable is mutable, is it on
>>>> purpose?
>>>> Can we make it immutable? For example, only accept from the constructor.
>>>> - Why do we accept a nullable `fieldNames`?
>>>>
>>>> 7. "a Row has two modes represented by an internal boolean flag
>>>> `hasFieldOrder`."
>>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
>>>> and is
>>>> used for. Could you explain a bit more for this?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> thanks for your feedback. Feedback from someone who interacts with many
>>>>> users is very valuable. I added an explanation for StatementSets to the
>>>>> FLIP.
>>>>>
>>>>> Regarding watermarks and fromInsertStream, actually the
>>>>>
>>>>> `Schema.watermark("ts", system_watermark())`
>>>>>
>>>>> is not really necessary in the `fromChangelogStream`. It is added to
>>>>> satify the Schema interface and be similar to SQL DDL.
>>>>>
>>>>> We could already extract the watermark strategy if we see
>>>>> `system_rowtime()` because in most of the cases we will simply use the
>>>>> DataStream API watermarks.
>>>>>
>>>>> But maybe some users want to generate watermarks after preprocessing in
>>>>> DataStream API. In this cases users what to define a computed watermark
>>>>> expression.
>>>>>
>>>>> So for simplicity in the Simple API we introduce:
>>>>>
>>>>> tEnv
>>>>>      .fromInsertStream(DataStream<T>)
>>>>>      .select('*, system_rowtime().as("rowtime"),
>>>>> system_proctime().as("proctime"))
>>>>>
>>>>> and just rely on the watermarks that travel through DataStream API
>>>>> already. I added another comment to the FLIP.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 19.08.20 10:53, David Anderson wrote:
>>>>>> Timo, nice to see this.
>>>>>>
>>>>>> As someone who expects to use these interfaces, but who doesn't fully
>>>>>> understand the existing Table API, I like what I see. Just a couple of
>>>>>> comments:
>>>>>>
>>>>>> The way that watermarks fit into the fromChangelogStream case makes
>>>>>> sense
>>>>>> to me, and I'm wondering why watermarks don't come up in the previous
>>>>>> section about fromInsertStream.
>>>>>>
>>>>>> I wasn't familiar with StatementSets, and I couldn't find an
>>>>>> explanation
>>>>> in
>>>>>> the docs. I eventually found this short paragraph in an email from
>>>>>> Fabian
>>>>>> Hueske, which clarified everything in that section for me:
>>>>>>
>>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
>>>>>> multiple
>>>>>> INSERT
>>>>>>        INTO statements (SQL or Table API) together. The statements in a
>>>>>> statement
>>>>>>        set are jointly optimized and executed as a single Flink job.
>>>>>>
>>>>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>>>>
>>>>>> Best,
>>>>>> David
>>>>>>
>>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>>>> shortcomings in the Table API:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>
>>>>>>>
>>>>>>> The Table API has received many new features over the last year. It
>>>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>>>> soon new TableDescriptors (FLIP-129).
>>>>>>>
>>>>>>> However, the interfaces from and to DataStream API have not been
>>>>>>> touched
>>>>>>> during the introduction of these new features and are kind of
>>>>>>> outdated.
>>>>>>> The interfaces lack important functionality that is available in
>>>>>>> Table
>>>>>>> API but not exposed to DataStream API users. DataStream API is
>>>>>>> still our
>>>>>>> most important API which is why a good interoperability is crucial.
>>>>>>>
>>>>>>> This FLIP is a mixture of different topics that improve the
>>>>>>> interoperability between DataStream and Table API in terms of:
>>>>>>>
>>>>>>> - DataStream <-> Table conversion
>>>>>>> - translation of type systems TypeInformation <-> DataType
>>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>>>> - changelog handling
>>>>>>> - row handling in DataStream API
>>>>>>>
>>>>>>> I'm looking forward to your feedback.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

Thanks for the quick response.

5. "StreamStatementSet#attachToStream()"
Joining or using connect() with a different DataStream is a good case.
cc @Godfrey , what do you think about the `attachToStream()` API?

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> We need a Map for constant time of mapping field name to index.
But we can easily build the Map from the List<String> fieldNames in Row
constructor.
IMO, manually building the Map and mapping names to indices is verbose and
error-prone.
Are you concerned about the per-record performance?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Thanks for the explanation.
Regarding the case (b), I have the same confusion with Dawid that what's
the result when index-based setters and name-based setters are mixed used
(esp. in foreach and if branches).
TBH, I don't see a strong need for named setters. Using it as the UDAF
accumulator is not as good as POJO in terms of performance and ease of use.

Best,
Jark

On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi all,
>
> I really like the ideas of this FLIP. I think it improves user
> experience quite a bit. I wanted to add just two comments:
>
> 1. As for the StatementSet I like the approach described in the FLIP for
> its simplicity. Moreover the way I see it is that if a user wants to
> work with DataStream, then he/she wants to end up in the DataStream API,
> or in other words call the StreamExecutionEnvironment#execute.
>
> 2. @Timo What is the interaction between Row setters from the different
> modes? What happens if the user calls both in different order. E.g.
>
> row.setField(0, "ABC");
>
> row.setField("f0", "ABC"); // is this a valid call ?
>
> or
>
> row.setField("f0", "ABC");
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> or
>
> row.setFieldNames(...);
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> Best,
>
> Dawid
>
> On 01/09/2020 11:49, Timo Walther wrote:
> > Hi Jark,
> >
> > thanks for the detailed review. Let me answer your concerns:
> >
> > ## Conversion of DataStream to Table
> >
> > 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > leaf of a QueryOperation tree in the validation phase."
> > I'm fine with allowing `system_proctime` everywhere in the query. Also
> > for SQL, I think we should have done that earlier already to give
> > users the chance to have time based operations also at later stages.
> >
> > 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > assigned implicitly. "
> > Yes, we just use the DataStream API watermark. `system_rowtime()` will
> > just introduce a time attribute, the watermark travels to the Table
> > API and into DataStream API without further code changes.
> >
> > ## Conversion of Table to DataStream
> >
> > 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > DataStream<Row>"
> > 4. "Table.execute(ChangelogMode)"
> > Filtering UPDATE_BEFORE is already quite important as it reduces the
> > amount of data by factor 2. But I also understand your concerns
> > regarding confusing users. I also got the request for a
> > `Table.getChangelogMode()` a couple of times in the past, because
> > users would like to get information about the kind of query that is
> > executed. However, in this case `toChangelogStream(Table)` is
> > equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > Table)` so we don't need `Table.getChangelogMode()` in the current
> > FLIP design. But this can be future work. Let's start with
> > `toChangelogStream(Table)` and wait for more feedback about this new
> > feature. What do others think?
> >
> > ## Conversion of StatementSet to DataStream API
> >
> > 5. "StreamStatementSet#attachToStream()"
> >
> > I think Godfrey's proposal is too complex for regular users. Instead
> > of continuing with the fluent programming, we would force users to
> > define a DataStream pipeline in a lambda.
> >
> > Furthermore, joining or using connect() with a different DataStream
> > source would not be possible in this design.
> >
> > The `execute()` method of `StatementSet` should not execute the
> > DataStream API subprogram. It mixes the concepts because we tell
> > users: "If you use toDataStream" you need to use
> > `StreamExecutionEnvironment.execute()`.
> >
> > We don't solve every potential use case with the current FLIP design
> > but the most important one where a pipeline just uses an INSERT INTO
> > but also uses Table API for connectors and preprocessing and does the
> > main logic in DataStream API:
> >
> > T1 -> T2, T3 -> DataStream, T4 -> DataStream
> >
> > I would consider `StatementSet.addDataStream(Table, ...)` future work
> > for now as it is only an opimization for reusing parts of the
> > StreamGraph. We could even perform this optimization when calling
> > `toInsertStream` or `toChangelogStream`.
> >
> > ## Improve dealing with Row in DataStream API
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > We need a Map for constant time of mapping field name to index.
> >
> > We accept a nullable `fieldNames` because names are not mandatory, one
> > can also work with indices as before.
> >
> > But you are right that the fieldNames member variable can be
> > immutable. I just wanted to avoid too many overloaded constructors.
> > I'm fine with having one full constructor for RowKind, arity and field
> > names (or null).
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> > Maybe I leaked to many implementation details there that rather
> > confuse readers than help. Internally, we need to distinguish between
> > two kinds of rows. A user should not be bothered by this.
> >
> > a) Row comes from Table API runtime: hasFieldOrder = true
> > Map("myAge" -> 0, "myName" -> 1)
> >
> > row.getField("myName") == row.getField(1)
> > row.getField("myAge") == row.getField(0)
> >
> > b) Row comes from user: hasFieldOrder = false
> > Row row = new Row(2);
> > row.setField("myName", "Alice");
> > row.setField("myAge", 32);
> >
> > Map("myAge" -> 1, "myName" -> 0)
> >
> > But the type information will decide about the order of the fields
> > later and reorder them accordingly during serialization or RowData
> > conversion:
> >
> > ["myName", "myAge"] vs. ["myAge", "myName"]
> >
> > The user must not care about this as it always feels naturally to deal
> > with the rows.
> >
> > Regards,
> > Timo
> >
> >
> > On 01.09.20 06:19, Jark Wu wrote:
> >> Hi Timo,
> >>
> >> Thanks a lot for the great proposal and sorry for the late reply.
> >> This is
> >> an important improvement for DataStream and Table API users.
> >>
> >> I have listed my thoughts and questions below ;-)
> >>
> >> ## Conversion of DataStream to Table
> >>
> >> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >> leaf of
> >> a QueryOperation tree in the validation phase."
> >> IIUC, that means `system_rowtime()` can only be used in the first
> >> `select()` after `fromXxxStream()`, right?
> >> However, I think `system_proctime()` shouldn't have this limitation,
> >> because it doesn't rely on the underlying timestamp of StreamRecord and
> >>   can be generated in any stage of the query.
> >>
> >> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >> assigned implicitly. "
> >> What watermark will be used here? Is the pre-assigned watermark in the
> >> DataStream (so called `system_watermak()`)?
> >>
> >> ## Conversion of Table to DataStream
> >>
> >> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >> DataStream<Row>"
> >> I'm not sure whether this method is useful for users. Currently, the
> >> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> >> filtering UPDATE_BEFORE if possible.
> >> However, if we expose this method to users, it may be confusing.
> >> Users may
> >> try to use this method to convert a changelog stream to an insert-only
> >> stream by applying ChangelogMode.insertOnly(). This might be misleading.
> >> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> >> have
> >> to know the ChangelogMode of the current Table first, and remove
> >> UPDATE_BEFORE from the ChagnelogMode.
> >> That means we have to support `Table.getChangelogMode()` first? But
> >> `ChangelogMode` derivation requires a full optimization path on the
> >> Table,
> >> which seems impossible now.
> >> Therefore, IMHO, we can introduce this interface in the future if users
> >> indeed need this. For most users, I think `toChangelogStream(Table)` is
> >> enough.
> >>
> >> 4. "Table.execute(ChangelogMode)"
> >> Ditto.
> >>
> >> ## Conversion of StatementSet to DataStream API
> >>
> >> 5. "StreamStatementSet#attachToStream()"
> >> I think the potential drawback is that it can't support multi-sink
> >> optimization, i.e. share pipeline.
> >> For example, if we have a Table `t1` (a heavy view uses join,
> >> aggregate),
> >> and want to sink to "mysql" using SQL and want to continue processing
> >> using
> >> DataStream in a job.
> >> It's a huge waste of resources if we re-compute `t1`. It would be
> >> nice if
> >> we can come up with a solution to share the pipeline.
> >>
> >> I borrowed Godfrey's idea in FLINK-18840 and added some
> >> modifications. What
> >> do you think about the following proposal?
> >>
> >> interface StatementSet {
> >>     StatementSet addDataStream(Table table, TableDataStreamTransform
> >> transform);
> >> }
> >>
> >> interface TableDataStreamTransform {
> >>     void transform(Context);
> >>
> >>     interface Context {
> >>         Table getTable();
> >>         DataStream<Row> toInsertStream(Table);
> >>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> >>         DataStream<Row> toChangelogStream(Table);
> >>     }
> >> }
> >>
> >> tEnv
> >>    .createStatementSet()
> >>    .addInsert("mysql", table1)
> >>    .addDataStream(table1, ctx -> {
> >>        ctx.toInsertStream(ctx.getTable())
> >>          .flatmap(..)
> >>          .keyBy(..)
> >>          .process(..)
> >>          .addSink(...);
> >>    })
> >>
> >>
> >> ## Improve dealing with Row in DataStream API
> >>
> >> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
> >> enough and more handy than Map ?
> >> - Currently, the fieldNames member variable is mutable, is it on
> >> purpose?
> >> Can we make it immutable? For example, only accept from the constructor.
> >> - Why do we accept a nullable `fieldNames`?
> >>
> >> 7. "a Row has two modes represented by an internal boolean flag
> >> `hasFieldOrder`."
> >> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> >> and is
> >> used for. Could you explain a bit more for this?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
> >>
> >>> Hi David,
> >>>
> >>> thanks for your feedback. Feedback from someone who interacts with many
> >>> users is very valuable. I added an explanation for StatementSets to the
> >>> FLIP.
> >>>
> >>> Regarding watermarks and fromInsertStream, actually the
> >>>
> >>> `Schema.watermark("ts", system_watermark())`
> >>>
> >>> is not really necessary in the `fromChangelogStream`. It is added to
> >>> satify the Schema interface and be similar to SQL DDL.
> >>>
> >>> We could already extract the watermark strategy if we see
> >>> `system_rowtime()` because in most of the cases we will simply use the
> >>> DataStream API watermarks.
> >>>
> >>> But maybe some users want to generate watermarks after preprocessing in
> >>> DataStream API. In this cases users what to define a computed watermark
> >>> expression.
> >>>
> >>> So for simplicity in the Simple API we introduce:
> >>>
> >>> tEnv
> >>>     .fromInsertStream(DataStream<T>)
> >>>     .select('*, system_rowtime().as("rowtime"),
> >>> system_proctime().as("proctime"))
> >>>
> >>> and just rely on the watermarks that travel through DataStream API
> >>> already. I added another comment to the FLIP.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 19.08.20 10:53, David Anderson wrote:
> >>>> Timo, nice to see this.
> >>>>
> >>>> As someone who expects to use these interfaces, but who doesn't fully
> >>>> understand the existing Table API, I like what I see. Just a couple of
> >>>> comments:
> >>>>
> >>>> The way that watermarks fit into the fromChangelogStream case makes
> >>>> sense
> >>>> to me, and I'm wondering why watermarks don't come up in the previous
> >>>> section about fromInsertStream.
> >>>>
> >>>> I wasn't familiar with StatementSets, and I couldn't find an
> >>>> explanation
> >>> in
> >>>> the docs. I eventually found this short paragraph in an email from
> >>>> Fabian
> >>>> Hueske, which clarified everything in that section for me:
> >>>>
> >>>>       FLIP-84 [1] added the concept of a "statement set" to group
> >>>> multiple
> >>>> INSERT
> >>>>       INTO statements (SQL or Table API) together. The statements in a
> >>>> statement
> >>>>       set are jointly optimized and executed as a single Flink job.
> >>>>
> >>>> Maybe if you add this to the FLIP it will help other readers as well.
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi everyone,
> >>>>>
> >>>>> I would like to propose a FLIP that aims to resolve the remaining
> >>>>> shortcomings in the Table API:
> >>>>>
> >>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>
> >>>>>
> >>>>> The Table API has received many new features over the last year. It
> >>>>> supports a new type system (FLIP-37), connectors support changelogs
> >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>>>> support for result retrieval in an interactive fashion (FLIP-84), and
> >>>>> soon new TableDescriptors (FLIP-129).
> >>>>>
> >>>>> However, the interfaces from and to DataStream API have not been
> >>>>> touched
> >>>>> during the introduction of these new features and are kind of
> >>>>> outdated.
> >>>>> The interfaces lack important functionality that is available in
> >>>>> Table
> >>>>> API but not exposed to DataStream API users. DataStream API is
> >>>>> still our
> >>>>> most important API which is why a good interoperability is crucial.
> >>>>>
> >>>>> This FLIP is a mixture of different topics that improve the
> >>>>> interoperability between DataStream and Table API in terms of:
> >>>>>
> >>>>> - DataStream <-> Table conversion
> >>>>> - translation of type systems TypeInformation <-> DataType
> >>>>> - schema definition (incl. rowtime, watermarks, primary key)
> >>>>> - changelog handling
> >>>>> - row handling in DataStream API
> >>>>>
> >>>>> I'm looking forward to your feedback.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi all,

I really like the ideas of this FLIP. I think it improves user
experience quite a bit. I wanted to add just two comments:

1. As for the StatementSet I like the approach described in the FLIP for
its simplicity. Moreover the way I see it is that if a user wants to
work with DataStream, then he/she wants to end up in the DataStream API,
or in other words call the StreamExecutionEnvironment#execute.

2. @Timo What is the interaction between Row setters from the different
modes? What happens if the user calls both in different order. E.g.

row.setField(0, "ABC");

row.setField("f0", "ABC"); // is this a valid call ?

or

row.setField("f0", "ABC");

row.setField(0, "ABC"); // is this a valid call ?

or

row.setFieldNames(...);

row.setField(0, "ABC"); // is this a valid call ?

Best,

Dawid

On 01/09/2020 11:49, Timo Walther wrote:
> Hi Jark,
>
> thanks for the detailed review. Let me answer your concerns:
>
> ## Conversion of DataStream to Table
>
> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> leaf of a QueryOperation tree in the validation phase."
> I'm fine with allowing `system_proctime` everywhere in the query. Also
> for SQL, I think we should have done that earlier already to give
> users the chance to have time based operations also at later stages.
>
> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> assigned implicitly. "
> Yes, we just use the DataStream API watermark. `system_rowtime()` will
> just introduce a time attribute, the watermark travels to the Table
> API and into DataStream API without further code changes.
>
> ## Conversion of Table to DataStream
>
> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> DataStream<Row>"
> 4. "Table.execute(ChangelogMode)"
> Filtering UPDATE_BEFORE is already quite important as it reduces the
> amount of data by factor 2. But I also understand your concerns
> regarding confusing users. I also got the request for a
> `Table.getChangelogMode()` a couple of times in the past, because
> users would like to get information about the kind of query that is
> executed. However, in this case `toChangelogStream(Table)` is
> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> Table)` so we don't need `Table.getChangelogMode()` in the current
> FLIP design. But this can be future work. Let's start with
> `toChangelogStream(Table)` and wait for more feedback about this new
> feature. What do others think?
>
> ## Conversion of StatementSet to DataStream API
>
> 5. "StreamStatementSet#attachToStream()"
>
> I think Godfrey's proposal is too complex for regular users. Instead
> of continuing with the fluent programming, we would force users to
> define a DataStream pipeline in a lambda.
>
> Furthermore, joining or using connect() with a different DataStream
> source would not be possible in this design.
>
> The `execute()` method of `StatementSet` should not execute the
> DataStream API subprogram. It mixes the concepts because we tell
> users: "If you use toDataStream" you need to use
> `StreamExecutionEnvironment.execute()`.
>
> We don't solve every potential use case with the current FLIP design
> but the most important one where a pipeline just uses an INSERT INTO
> but also uses Table API for connectors and preprocessing and does the
> main logic in DataStream API:
>
> T1 -> T2, T3 -> DataStream, T4 -> DataStream
>
> I would consider `StatementSet.addDataStream(Table, ...)` future work
> for now as it is only an opimization for reusing parts of the
> StreamGraph. We could even perform this optimization when calling
> `toInsertStream` or `toChangelogStream`.
>
> ## Improve dealing with Row in DataStream API
>
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>
> We need a Map for constant time of mapping field name to index.
>
> We accept a nullable `fieldNames` because names are not mandatory, one
> can also work with indices as before.
>
> But you are right that the fieldNames member variable can be
> immutable. I just wanted to avoid too many overloaded constructors.
> I'm fine with having one full constructor for RowKind, arity and field
> names (or null).
>
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Maybe I leaked to many implementation details there that rather
> confuse readers than help. Internally, we need to distinguish between
> two kinds of rows. A user should not be bothered by this.
>
> a) Row comes from Table API runtime: hasFieldOrder = true
> Map("myAge" -> 0, "myName" -> 1)
>
> row.getField("myName") == row.getField(1)
> row.getField("myAge") == row.getField(0)
>
> b) Row comes from user: hasFieldOrder = false
> Row row = new Row(2);
> row.setField("myName", "Alice");
> row.setField("myAge", 32);
>
> Map("myAge" -> 1, "myName" -> 0)
>
> But the type information will decide about the order of the fields
> later and reorder them accordingly during serialization or RowData
> conversion:
>
> ["myName", "myAge"] vs. ["myAge", "myName"]
>
> The user must not care about this as it always feels naturally to deal
> with the rows.
>
> Regards,
> Timo
>
>
> On 01.09.20 06:19, Jark Wu wrote:
>> Hi Timo,
>>
>> Thanks a lot for the great proposal and sorry for the late reply.
>> This is
>> an important improvement for DataStream and Table API users.
>>
>> I have listed my thoughts and questions below ;-)
>>
>> ## Conversion of DataStream to Table
>>
>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>> leaf of
>> a QueryOperation tree in the validation phase."
>> IIUC, that means `system_rowtime()` can only be used in the first
>> `select()` after `fromXxxStream()`, right?
>> However, I think `system_proctime()` shouldn't have this limitation,
>> because it doesn't rely on the underlying timestamp of StreamRecord and
>>   can be generated in any stage of the query.
>>
>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>> assigned implicitly. "
>> What watermark will be used here? Is the pre-assigned watermark in the
>> DataStream (so called `system_watermak()`)?
>>
>> ## Conversion of Table to DataStream
>>
>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>> DataStream<Row>"
>> I'm not sure whether this method is useful for users. Currently, the
>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
>> filtering UPDATE_BEFORE if possible.
>> However, if we expose this method to users, it may be confusing.
>> Users may
>> try to use this method to convert a changelog stream to an insert-only
>> stream by applying ChangelogMode.insertOnly(). This might be misleading.
>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
>> have
>> to know the ChangelogMode of the current Table first, and remove
>> UPDATE_BEFORE from the ChagnelogMode.
>> That means we have to support `Table.getChangelogMode()` first? But
>> `ChangelogMode` derivation requires a full optimization path on the
>> Table,
>> which seems impossible now.
>> Therefore, IMHO, we can introduce this interface in the future if users
>> indeed need this. For most users, I think `toChangelogStream(Table)` is
>> enough.
>>
>> 4. "Table.execute(ChangelogMode)"
>> Ditto.
>>
>> ## Conversion of StatementSet to DataStream API
>>
>> 5. "StreamStatementSet#attachToStream()"
>> I think the potential drawback is that it can't support multi-sink
>> optimization, i.e. share pipeline.
>> For example, if we have a Table `t1` (a heavy view uses join,
>> aggregate),
>> and want to sink to "mysql" using SQL and want to continue processing
>> using
>> DataStream in a job.
>> It's a huge waste of resources if we re-compute `t1`. It would be
>> nice if
>> we can come up with a solution to share the pipeline.
>>
>> I borrowed Godfrey's idea in FLINK-18840 and added some
>> modifications. What
>> do you think about the following proposal?
>>
>> interface StatementSet {
>>     StatementSet addDataStream(Table table, TableDataStreamTransform
>> transform);
>> }
>>
>> interface TableDataStreamTransform {
>>     void transform(Context);
>>
>>     interface Context {
>>         Table getTable();
>>         DataStream<Row> toInsertStream(Table);
>>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>>         DataStream<Row> toChangelogStream(Table);
>>     }
>> }
>>
>> tEnv
>>    .createStatementSet()
>>    .addInsert("mysql", table1)
>>    .addDataStream(table1, ctx -> {
>>        ctx.toInsertStream(ctx.getTable())
>>          .flatmap(..)
>>          .keyBy(..)
>>          .process(..)
>>          .addSink(...);
>>    })
>>
>>
>> ## Improve dealing with Row in DataStream API
>>
>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
>> enough and more handy than Map ?
>> - Currently, the fieldNames member variable is mutable, is it on
>> purpose?
>> Can we make it immutable? For example, only accept from the constructor.
>> - Why do we accept a nullable `fieldNames`?
>>
>> 7. "a Row has two modes represented by an internal boolean flag
>> `hasFieldOrder`."
>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
>> and is
>> used for. Could you explain a bit more for this?
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi David,
>>>
>>> thanks for your feedback. Feedback from someone who interacts with many
>>> users is very valuable. I added an explanation for StatementSets to the
>>> FLIP.
>>>
>>> Regarding watermarks and fromInsertStream, actually the
>>>
>>> `Schema.watermark("ts", system_watermark())`
>>>
>>> is not really necessary in the `fromChangelogStream`. It is added to
>>> satify the Schema interface and be similar to SQL DDL.
>>>
>>> We could already extract the watermark strategy if we see
>>> `system_rowtime()` because in most of the cases we will simply use the
>>> DataStream API watermarks.
>>>
>>> But maybe some users want to generate watermarks after preprocessing in
>>> DataStream API. In this cases users what to define a computed watermark
>>> expression.
>>>
>>> So for simplicity in the Simple API we introduce:
>>>
>>> tEnv
>>>     .fromInsertStream(DataStream<T>)
>>>     .select('*, system_rowtime().as("rowtime"),
>>> system_proctime().as("proctime"))
>>>
>>> and just rely on the watermarks that travel through DataStream API
>>> already. I added another comment to the FLIP.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 19.08.20 10:53, David Anderson wrote:
>>>> Timo, nice to see this.
>>>>
>>>> As someone who expects to use these interfaces, but who doesn't fully
>>>> understand the existing Table API, I like what I see. Just a couple of
>>>> comments:
>>>>
>>>> The way that watermarks fit into the fromChangelogStream case makes
>>>> sense
>>>> to me, and I'm wondering why watermarks don't come up in the previous
>>>> section about fromInsertStream.
>>>>
>>>> I wasn't familiar with StatementSets, and I couldn't find an
>>>> explanation
>>> in
>>>> the docs. I eventually found this short paragraph in an email from
>>>> Fabian
>>>> Hueske, which clarified everything in that section for me:
>>>>
>>>>       FLIP-84 [1] added the concept of a "statement set" to group
>>>> multiple
>>>> INSERT
>>>>       INTO statements (SQL or Table API) together. The statements in a
>>>> statement
>>>>       set are jointly optimized and executed as a single Flink job.
>>>>
>>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>
>>>>>
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been
>>>>> touched
>>>>> during the introduction of these new features and are kind of
>>>>> outdated.
>>>>> The interfaces lack important functionality that is available in
>>>>> Table
>>>>> API but not exposed to DataStream API users. DataStream API is
>>>>> still our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>
>>>
>>>
>>
>


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Jark,

thanks for the detailed review. Let me answer your concerns:

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf 
of a QueryOperation tree in the validation phase."
I'm fine with allowing `system_proctime` everywhere in the query. Also 
for SQL, I think we should have done that earlier already to give users 
the chance to have time based operations also at later stages.

2. "By using `system_rowtime().as("rowtime")` the watermark would be 
assigned implicitly. "
Yes, we just use the DataStream API watermark. `system_rowtime()` will 
just introduce a time attribute, the watermark travels to the Table API 
and into DataStream API without further code changes.

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): 
DataStream<Row>"
4. "Table.execute(ChangelogMode)"
Filtering UPDATE_BEFORE is already quite important as it reduces the 
amount of data by factor 2. But I also understand your concerns 
regarding confusing users. I also got the request for a 
`Table.getChangelogMode()` a couple of times in the past, because users 
would like to get information about the kind of query that is executed. 
However, in this case `toChangelogStream(Table)` is equivalent to call 
``toChangelogStream(Table.getChangelogMode(), Table)` so we don't need 
`Table.getChangelogMode()` in the current FLIP design. But this can be 
future work. Let's start with `toChangelogStream(Table)` and wait for 
more feedback about this new feature. What do others think?

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"

I think Godfrey's proposal is too complex for regular users. Instead of 
continuing with the fluent programming, we would force users to define a 
DataStream pipeline in a lambda.

Furthermore, joining or using connect() with a different DataStream 
source would not be possible in this design.

The `execute()` method of `StatementSet` should not execute the 
DataStream API subprogram. It mixes the concepts because we tell users: 
"If you use toDataStream" you need to use 
`StreamExecutionEnvironment.execute()`.

We don't solve every potential use case with the current FLIP design but 
the most important one where a pipeline just uses an INSERT INTO but 
also uses Table API for connectors and preprocessing and does the main 
logic in DataStream API:

T1 -> T2, T3 -> DataStream, T4 -> DataStream

I would consider `StatementSet.addDataStream(Table, ...)` future work 
for now as it is only an opimization for reusing parts of the 
StreamGraph. We could even perform this optimization when calling 
`toInsertStream` or `toChangelogStream`.

## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"

We need a Map for constant time of mapping field name to index.

We accept a nullable `fieldNames` because names are not mandatory, one 
can also work with indices as before.

But you are right that the fieldNames member variable can be immutable. 
I just wanted to avoid too many overloaded constructors. I'm fine with 
having one full constructor for RowKind, arity and field names (or null).

7. "a Row has two modes represented by an internal boolean flag 
`hasFieldOrder`."
Maybe I leaked to many implementation details there that rather confuse 
readers than help. Internally, we need to distinguish between two kinds 
of rows. A user should not be bothered by this.

a) Row comes from Table API runtime: hasFieldOrder = true
Map("myAge" -> 0, "myName" -> 1)

row.getField("myName") == row.getField(1)
row.getField("myAge") == row.getField(0)

b) Row comes from user: hasFieldOrder = false
Row row = new Row(2);
row.setField("myName", "Alice");
row.setField("myAge", 32);

Map("myAge" -> 1, "myName" -> 0)

But the type information will decide about the order of the fields later 
and reorder them accordingly during serialization or RowData conversion:

["myName", "myAge"] vs. ["myAge", "myName"]

The user must not care about this as it always feels naturally to deal 
with the rows.

Regards,
Timo


On 01.09.20 06:19, Jark Wu wrote:
> Hi Timo,
> 
> Thanks a lot for the great proposal and sorry for the late reply. This is
> an important improvement for DataStream and Table API users.
> 
> I have listed my thoughts and questions below ;-)
> 
> ## Conversion of DataStream to Table
> 
> 1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
> a QueryOperation tree in the validation phase."
> IIUC, that means `system_rowtime()` can only be used in the first
> `select()` after `fromXxxStream()`, right?
> However, I think `system_proctime()` shouldn't have this limitation,
> because it doesn't rely on the underlying timestamp of StreamRecord and
>   can be generated in any stage of the query.
> 
> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> assigned implicitly. "
> What watermark will be used here? Is the pre-assigned watermark in the
> DataStream (so called `system_watermak()`)?
> 
> ## Conversion of Table to DataStream
> 
> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> DataStream<Row>"
> I'm not sure whether this method is useful for users. Currently, the
> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> filtering UPDATE_BEFORE if possible.
> However, if we expose this method to users, it may be confusing. Users may
> try to use this method to convert a changelog stream to an insert-only
> stream by applying ChangelogMode.insertOnly(). This might be misleading.
> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
> to know the ChangelogMode of the current Table first, and remove
> UPDATE_BEFORE from the ChagnelogMode.
> That means we have to support `Table.getChangelogMode()` first? But
> `ChangelogMode` derivation requires a full optimization path on the Table,
> which seems impossible now.
> Therefore, IMHO, we can introduce this interface in the future if users
> indeed need this. For most users, I think `toChangelogStream(Table)` is
> enough.
> 
> 4. "Table.execute(ChangelogMode)"
> Ditto.
> 
> ## Conversion of StatementSet to DataStream API
> 
> 5. "StreamStatementSet#attachToStream()"
> I think the potential drawback is that it can't support multi-sink
> optimization, i.e. share pipeline.
> For example, if we have a Table `t1` (a heavy view uses join, aggregate),
> and want to sink to "mysql" using SQL and want to continue processing using
> DataStream in a job.
> It's a huge waste of resources if we re-compute `t1`. It would be nice if
> we can come up with a solution to share the pipeline.
> 
> I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
> do you think about the following proposal?
> 
> interface StatementSet {
>     StatementSet addDataStream(Table table, TableDataStreamTransform
> transform);
> }
> 
> interface TableDataStreamTransform {
>     void transform(Context);
> 
>     interface Context {
>         Table getTable();
>         DataStream<Row> toInsertStream(Table);
>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>         DataStream<Row> toChangelogStream(Table);
>     }
> }
> 
> tEnv
>    .createStatementSet()
>    .addInsert("mysql", table1)
>    .addDataStream(table1, ctx -> {
>        ctx.toInsertStream(ctx.getTable())
>          .flatmap(..)
>          .keyBy(..)
>          .process(..)
>          .addSink(...);
>    })
> 
> 
> ## Improve dealing with Row in DataStream API
> 
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
> enough and more handy than Map ?
> - Currently, the fieldNames member variable is mutable, is it on purpose?
> Can we make it immutable? For example, only accept from the constructor.
> - Why do we accept a nullable `fieldNames`?
> 
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
> used for. Could you explain a bit more for this?
> 
> Best,
> Jark
> 
> 
> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
> 
>> Hi David,
>>
>> thanks for your feedback. Feedback from someone who interacts with many
>> users is very valuable. I added an explanation for StatementSets to the
>> FLIP.
>>
>> Regarding watermarks and fromInsertStream, actually the
>>
>> `Schema.watermark("ts", system_watermark())`
>>
>> is not really necessary in the `fromChangelogStream`. It is added to
>> satify the Schema interface and be similar to SQL DDL.
>>
>> We could already extract the watermark strategy if we see
>> `system_rowtime()` because in most of the cases we will simply use the
>> DataStream API watermarks.
>>
>> But maybe some users want to generate watermarks after preprocessing in
>> DataStream API. In this cases users what to define a computed watermark
>> expression.
>>
>> So for simplicity in the Simple API we introduce:
>>
>> tEnv
>>     .fromInsertStream(DataStream<T>)
>>     .select('*, system_rowtime().as("rowtime"),
>> system_proctime().as("proctime"))
>>
>> and just rely on the watermarks that travel through DataStream API
>> already. I added another comment to the FLIP.
>>
>> Regards,
>> Timo
>>
>>
>> On 19.08.20 10:53, David Anderson wrote:
>>> Timo, nice to see this.
>>>
>>> As someone who expects to use these interfaces, but who doesn't fully
>>> understand the existing Table API, I like what I see. Just a couple of
>>> comments:
>>>
>>> The way that watermarks fit into the fromChangelogStream case makes sense
>>> to me, and I'm wondering why watermarks don't come up in the previous
>>> section about fromInsertStream.
>>>
>>> I wasn't familiar with StatementSets, and I couldn't find an explanation
>> in
>>> the docs. I eventually found this short paragraph in an email from Fabian
>>> Hueske, which clarified everything in that section for me:
>>>
>>>       FLIP-84 [1] added the concept of a "statement set" to group multiple
>>> INSERT
>>>       INTO statements (SQL or Table API) together. The statements in a
>>> statement
>>>       set are jointly optimized and executed as a single Flink job.
>>>
>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>
>>> Best,
>>> David
>>>
>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>> shortcomings in the Table API:
>>>>
>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>
>>>> The Table API has received many new features over the last year. It
>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>> soon new TableDescriptors (FLIP-129).
>>>>
>>>> However, the interfaces from and to DataStream API have not been touched
>>>> during the introduction of these new features and are kind of outdated.
>>>> The interfaces lack important functionality that is available in Table
>>>> API but not exposed to DataStream API users. DataStream API is still our
>>>> most important API which is why a good interoperability is crucial.
>>>>
>>>> This FLIP is a mixture of different topics that improve the
>>>> interoperability between DataStream and Table API in terms of:
>>>>
>>>> - DataStream <-> Table conversion
>>>> - translation of type systems TypeInformation <-> DataType
>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>> - changelog handling
>>>> - row handling in DataStream API
>>>>
>>>> I'm looking forward to your feedback.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

Thanks a lot for the great proposal and sorry for the late reply. This is
an important improvement for DataStream and Table API users.

I have listed my thoughts and questions below ;-)

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
a QueryOperation tree in the validation phase."
IIUC, that means `system_rowtime()` can only be used in the first
`select()` after `fromXxxStream()`, right?
However, I think `system_proctime()` shouldn't have this limitation,
because it doesn't rely on the underlying timestamp of StreamRecord and
 can be generated in any stage of the query.

2. "By using `system_rowtime().as("rowtime")` the watermark would be
assigned implicitly. "
What watermark will be used here? Is the pre-assigned watermark in the
DataStream (so called `system_watermak()`)?

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
DataStream<Row>"
I'm not sure whether this method is useful for users. Currently, the
`DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
filtering UPDATE_BEFORE if possible.
However, if we expose this method to users, it may be confusing. Users may
try to use this method to convert a changelog stream to an insert-only
stream by applying ChangelogMode.insertOnly(). This might be misleading.
What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
to know the ChangelogMode of the current Table first, and remove
UPDATE_BEFORE from the ChagnelogMode.
That means we have to support `Table.getChangelogMode()` first? But
`ChangelogMode` derivation requires a full optimization path on the Table,
which seems impossible now.
Therefore, IMHO, we can introduce this interface in the future if users
indeed need this. For most users, I think `toChangelogStream(Table)` is
enough.

4. "Table.execute(ChangelogMode)"
Ditto.

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"
I think the potential drawback is that it can't support multi-sink
optimization, i.e. share pipeline.
For example, if we have a Table `t1` (a heavy view uses join, aggregate),
and want to sink to "mysql" using SQL and want to continue processing using
DataStream in a job.
It's a huge waste of resources if we re-compute `t1`. It would be nice if
we can come up with a solution to share the pipeline.

I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
do you think about the following proposal?

interface StatementSet {
   StatementSet addDataStream(Table table, TableDataStreamTransform
transform);
}

interface TableDataStreamTransform {
   void transform(Context);

   interface Context {
       Table getTable();
       DataStream<Row> toInsertStream(Table);
       DataStream<T> toInsertStream(AbstractDataType<?>, Table);
       DataStream<Row> toChangelogStream(Table);
   }
}

tEnv
  .createStatementSet()
  .addInsert("mysql", table1)
  .addDataStream(table1, ctx -> {
      ctx.toInsertStream(ctx.getTable())
        .flatmap(..)
        .keyBy(..)
        .process(..)
        .addSink(...);
  })


## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
- Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
enough and more handy than Map ?
- Currently, the fieldNames member variable is mutable, is it on purpose?
Can we make it immutable? For example, only accept from the constructor.
- Why do we accept a nullable `fieldNames`?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
used for. Could you explain a bit more for this?

Best,
Jark


On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:

> Hi David,
>
> thanks for your feedback. Feedback from someone who interacts with many
> users is very valuable. I added an explanation for StatementSets to the
> FLIP.
>
> Regarding watermarks and fromInsertStream, actually the
>
> `Schema.watermark("ts", system_watermark())`
>
> is not really necessary in the `fromChangelogStream`. It is added to
> satify the Schema interface and be similar to SQL DDL.
>
> We could already extract the watermark strategy if we see
> `system_rowtime()` because in most of the cases we will simply use the
> DataStream API watermarks.
>
> But maybe some users want to generate watermarks after preprocessing in
> DataStream API. In this cases users what to define a computed watermark
> expression.
>
> So for simplicity in the Simple API we introduce:
>
> tEnv
>    .fromInsertStream(DataStream<T>)
>    .select('*, system_rowtime().as("rowtime"),
> system_proctime().as("proctime"))
>
> and just rely on the watermarks that travel through DataStream API
> already. I added another comment to the FLIP.
>
> Regards,
> Timo
>
>
> On 19.08.20 10:53, David Anderson wrote:
> > Timo, nice to see this.
> >
> > As someone who expects to use these interfaces, but who doesn't fully
> > understand the existing Table API, I like what I see. Just a couple of
> > comments:
> >
> > The way that watermarks fit into the fromChangelogStream case makes sense
> > to me, and I'm wondering why watermarks don't come up in the previous
> > section about fromInsertStream.
> >
> > I wasn't familiar with StatementSets, and I couldn't find an explanation
> in
> > the docs. I eventually found this short paragraph in an email from Fabian
> > Hueske, which clarified everything in that section for me:
> >
> >      FLIP-84 [1] added the concept of a "statement set" to group multiple
> > INSERT
> >      INTO statements (SQL or Table API) together. The statements in a
> > statement
> >      set are jointly optimized and executed as a single Flink job.
> >
> > Maybe if you add this to the FLIP it will help other readers as well.
> >
> > Best,
> > David
> >
> > On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to propose a FLIP that aims to resolve the remaining
> >> shortcomings in the Table API:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>
> >> The Table API has received many new features over the last year. It
> >> supports a new type system (FLIP-37), connectors support changelogs
> >> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >> support for result retrieval in an interactive fashion (FLIP-84), and
> >> soon new TableDescriptors (FLIP-129).
> >>
> >> However, the interfaces from and to DataStream API have not been touched
> >> during the introduction of these new features and are kind of outdated.
> >> The interfaces lack important functionality that is available in Table
> >> API but not exposed to DataStream API users. DataStream API is still our
> >> most important API which is why a good interoperability is crucial.
> >>
> >> This FLIP is a mixture of different topics that improve the
> >> interoperability between DataStream and Table API in terms of:
> >>
> >> - DataStream <-> Table conversion
> >> - translation of type systems TypeInformation <-> DataType
> >> - schema definition (incl. rowtime, watermarks, primary key)
> >> - changelog handling
> >> - row handling in DataStream API
> >>
> >> I'm looking forward to your feedback.
> >>
> >> Regards,
> >> Timo
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi David,

thanks for your feedback. Feedback from someone who interacts with many 
users is very valuable. I added an explanation for StatementSets to the 
FLIP.

Regarding watermarks and fromInsertStream, actually the

`Schema.watermark("ts", system_watermark())`

is not really necessary in the `fromChangelogStream`. It is added to 
satify the Schema interface and be similar to SQL DDL.

We could already extract the watermark strategy if we see 
`system_rowtime()` because in most of the cases we will simply use the 
DataStream API watermarks.

But maybe some users want to generate watermarks after preprocessing in 
DataStream API. In this cases users what to define a computed watermark 
expression.

So for simplicity in the Simple API we introduce:

tEnv
   .fromInsertStream(DataStream<T>)
   .select('*, system_rowtime().as("rowtime"), 
system_proctime().as("proctime"))

and just rely on the watermarks that travel through DataStream API 
already. I added another comment to the FLIP.

Regards,
Timo


On 19.08.20 10:53, David Anderson wrote:
> Timo, nice to see this.
> 
> As someone who expects to use these interfaces, but who doesn't fully
> understand the existing Table API, I like what I see. Just a couple of
> comments:
> 
> The way that watermarks fit into the fromChangelogStream case makes sense
> to me, and I'm wondering why watermarks don't come up in the previous
> section about fromInsertStream.
> 
> I wasn't familiar with StatementSets, and I couldn't find an explanation in
> the docs. I eventually found this short paragraph in an email from Fabian
> Hueske, which clarified everything in that section for me:
> 
>      FLIP-84 [1] added the concept of a "statement set" to group multiple
> INSERT
>      INTO statements (SQL or Table API) together. The statements in a
> statement
>      set are jointly optimized and executed as a single Flink job.
> 
> Maybe if you add this to the FLIP it will help other readers as well.
> 
> Best,
> David
> 
> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org> wrote:
> 
>> Hi everyone,
>>
>> I would like to propose a FLIP that aims to resolve the remaining
>> shortcomings in the Table API:
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>
>> The Table API has received many new features over the last year. It
>> supports a new type system (FLIP-37), connectors support changelogs
>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>> support for result retrieval in an interactive fashion (FLIP-84), and
>> soon new TableDescriptors (FLIP-129).
>>
>> However, the interfaces from and to DataStream API have not been touched
>> during the introduction of these new features and are kind of outdated.
>> The interfaces lack important functionality that is available in Table
>> API but not exposed to DataStream API users. DataStream API is still our
>> most important API which is why a good interoperability is crucial.
>>
>> This FLIP is a mixture of different topics that improve the
>> interoperability between DataStream and Table API in terms of:
>>
>> - DataStream <-> Table conversion
>> - translation of type systems TypeInformation <-> DataType
>> - schema definition (incl. rowtime, watermarks, primary key)
>> - changelog handling
>> - row handling in DataStream API
>>
>> I'm looking forward to your feedback.
>>
>> Regards,
>> Timo
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by David Anderson <da...@alpinegizmo.com>.
Timo, nice to see this.

As someone who expects to use these interfaces, but who doesn't fully
understand the existing Table API, I like what I see. Just a couple of
comments:

The way that watermarks fit into the fromChangelogStream case makes sense
to me, and I'm wondering why watermarks don't come up in the previous
section about fromInsertStream.

I wasn't familiar with StatementSets, and I couldn't find an explanation in
the docs. I eventually found this short paragraph in an email from Fabian
Hueske, which clarified everything in that section for me:

    FLIP-84 [1] added the concept of a "statement set" to group multiple
INSERT
    INTO statements (SQL or Table API) together. The statements in a
statement
    set are jointly optimized and executed as a single Flink job.

Maybe if you add this to the FLIP it will help other readers as well.

Best,
David

On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org> wrote:

> Hi everyone,
>
> I would like to propose a FLIP that aims to resolve the remaining
> shortcomings in the Table API:
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
> The Table API has received many new features over the last year. It
> supports a new type system (FLIP-37), connectors support changelogs
> (FLIP-95), we have well defined internal data structures (FLIP-95),
> support for result retrieval in an interactive fashion (FLIP-84), and
> soon new TableDescriptors (FLIP-129).
>
> However, the interfaces from and to DataStream API have not been touched
> during the introduction of these new features and are kind of outdated.
> The interfaces lack important functionality that is available in Table
> API but not exposed to DataStream API users. DataStream API is still our
> most important API which is why a good interoperability is crucial.
>
> This FLIP is a mixture of different topics that improve the
> interoperability between DataStream and Table API in terms of:
>
> - DataStream <-> Table conversion
> - translation of type systems TypeInformation <-> DataType
> - schema definition (incl. rowtime, watermarks, primary key)
> - changelog handling
> - row handling in DataStream API
>
> I'm looking forward to your feedback.
>
> Regards,
> Timo
>