You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jark Wu <im...@gmail.com> on 2020/09/01 04:19:22 UTC

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Hi Timo,

Thanks a lot for the great proposal and sorry for the late reply. This is
an important improvement for DataStream and Table API users.

I have listed my thoughts and questions below ;-)

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
a QueryOperation tree in the validation phase."
IIUC, that means `system_rowtime()` can only be used in the first
`select()` after `fromXxxStream()`, right?
However, I think `system_proctime()` shouldn't have this limitation,
because it doesn't rely on the underlying timestamp of StreamRecord and
 can be generated in any stage of the query.

2. "By using `system_rowtime().as("rowtime")` the watermark would be
assigned implicitly. "
What watermark will be used here? Is the pre-assigned watermark in the
DataStream (so called `system_watermak()`)?

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
DataStream<Row>"
I'm not sure whether this method is useful for users. Currently, the
`DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
filtering UPDATE_BEFORE if possible.
However, if we expose this method to users, it may be confusing. Users may
try to use this method to convert a changelog stream to an insert-only
stream by applying ChangelogMode.insertOnly(). This might be misleading.
What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
to know the ChangelogMode of the current Table first, and remove
UPDATE_BEFORE from the ChagnelogMode.
That means we have to support `Table.getChangelogMode()` first? But
`ChangelogMode` derivation requires a full optimization path on the Table,
which seems impossible now.
Therefore, IMHO, we can introduce this interface in the future if users
indeed need this. For most users, I think `toChangelogStream(Table)` is
enough.

4. "Table.execute(ChangelogMode)"
Ditto.

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"
I think the potential drawback is that it can't support multi-sink
optimization, i.e. share pipeline.
For example, if we have a Table `t1` (a heavy view uses join, aggregate),
and want to sink to "mysql" using SQL and want to continue processing using
DataStream in a job.
It's a huge waste of resources if we re-compute `t1`. It would be nice if
we can come up with a solution to share the pipeline.

I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
do you think about the following proposal?

interface StatementSet {
   StatementSet addDataStream(Table table, TableDataStreamTransform
transform);
}

interface TableDataStreamTransform {
   void transform(Context);

   interface Context {
       Table getTable();
       DataStream<Row> toInsertStream(Table);
       DataStream<T> toInsertStream(AbstractDataType<?>, Table);
       DataStream<Row> toChangelogStream(Table);
   }
}

tEnv
  .createStatementSet()
  .addInsert("mysql", table1)
  .addDataStream(table1, ctx -> {
      ctx.toInsertStream(ctx.getTable())
        .flatmap(..)
        .keyBy(..)
        .process(..)
        .addSink(...);
  })


## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
- Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
enough and more handy than Map ?
- Currently, the fieldNames member variable is mutable, is it on purpose?
Can we make it immutable? For example, only accept from the constructor.
- Why do we accept a nullable `fieldNames`?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
used for. Could you explain a bit more for this?

Best,
Jark


On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:

> Hi David,
>
> thanks for your feedback. Feedback from someone who interacts with many
> users is very valuable. I added an explanation for StatementSets to the
> FLIP.
>
> Regarding watermarks and fromInsertStream, actually the
>
> `Schema.watermark("ts", system_watermark())`
>
> is not really necessary in the `fromChangelogStream`. It is added to
> satify the Schema interface and be similar to SQL DDL.
>
> We could already extract the watermark strategy if we see
> `system_rowtime()` because in most of the cases we will simply use the
> DataStream API watermarks.
>
> But maybe some users want to generate watermarks after preprocessing in
> DataStream API. In this cases users what to define a computed watermark
> expression.
>
> So for simplicity in the Simple API we introduce:
>
> tEnv
>    .fromInsertStream(DataStream<T>)
>    .select('*, system_rowtime().as("rowtime"),
> system_proctime().as("proctime"))
>
> and just rely on the watermarks that travel through DataStream API
> already. I added another comment to the FLIP.
>
> Regards,
> Timo
>
>
> On 19.08.20 10:53, David Anderson wrote:
> > Timo, nice to see this.
> >
> > As someone who expects to use these interfaces, but who doesn't fully
> > understand the existing Table API, I like what I see. Just a couple of
> > comments:
> >
> > The way that watermarks fit into the fromChangelogStream case makes sense
> > to me, and I'm wondering why watermarks don't come up in the previous
> > section about fromInsertStream.
> >
> > I wasn't familiar with StatementSets, and I couldn't find an explanation
> in
> > the docs. I eventually found this short paragraph in an email from Fabian
> > Hueske, which clarified everything in that section for me:
> >
> >      FLIP-84 [1] added the concept of a "statement set" to group multiple
> > INSERT
> >      INTO statements (SQL or Table API) together. The statements in a
> > statement
> >      set are jointly optimized and executed as a single Flink job.
> >
> > Maybe if you add this to the FLIP it will help other readers as well.
> >
> > Best,
> > David
> >
> > On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to propose a FLIP that aims to resolve the remaining
> >> shortcomings in the Table API:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>
> >> The Table API has received many new features over the last year. It
> >> supports a new type system (FLIP-37), connectors support changelogs
> >> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >> support for result retrieval in an interactive fashion (FLIP-84), and
> >> soon new TableDescriptors (FLIP-129).
> >>
> >> However, the interfaces from and to DataStream API have not been touched
> >> during the introduction of these new features and are kind of outdated.
> >> The interfaces lack important functionality that is available in Table
> >> API but not exposed to DataStream API users. DataStream API is still our
> >> most important API which is why a good interoperability is crucial.
> >>
> >> This FLIP is a mixture of different topics that improve the
> >> interoperability between DataStream and Table API in terms of:
> >>
> >> - DataStream <-> Table conversion
> >> - translation of type systems TypeInformation <-> DataType
> >> - schema definition (incl. rowtime, watermarks, primary key)
> >> - changelog handling
> >> - row handling in DataStream API
> >>
> >> I'm looking forward to your feedback.
> >>
> >> Regards,
> >> Timo
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Jingsong raised a good point. We need to be more careful when deprecating
APIs.
For example, tEnv#createTemporaryView was introduced in release-1.10, users
became familiar with this API in the previous release, but now we want to
deprecate it in the next release.

I also have some concerns about deprecating `.rowtime()`, `.proctime()`. I
agree it's a misunderstanding to apply expressions on non-exist fields.
However, these APIs have been introduced since the early time of Table API
and have been highly used.
So I think the misunderstanding shouldn't be a big problem, users have
already accepted it, and the `.rowtime()` and `.proctime()` is a more
fluent API.
Dropping a highly used API and educating users to learn a new one will hurt
users a lot. Could we keep the old API and introduce the new one (which is
the advanced one)?

In a word, I'm +1 to keep the `fromDataStream` which is more
straightforward than `fromInsertStream` for batch users and most streaming
users.
Besides, if we want to have a corresponding thing in the sink side, maybe
we can have `toDataStream` and deprecate `toAppendStream`.

Best,
Jark

On Wed, 2 Sep 2020 at 11:55, Jingsong Li <ji...@gmail.com> wrote:

> Thanks Timo for driving.
>
> My first impression is, can we not deprecate these API?
> - StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> - StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...):
> Table
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>,
> Expression...): Unit
> - StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit
> - StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz):
> DataStream<T>
> - StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T>
> typeInfo): DataStream<T>
>
> I think they may also be commonly used APIs. My intuitive feeling is that
> the API of the table layer is changing too fast, and there are a lot of
> changes in each version. Even if there is a "deprecated", they will be
> removed one day. We can avoid the change unless there's a strong reason.
>
> 1.fromDataStream VS fromInsertStream:
> - In big data systems, or in our previous designs, APIs, including DDL, the
> default is pure insert. Subsequent CDC, upsert, and delete are all
> supplementary extension capabilities. Therefore, by default, it is insert,
> which is known and familiar to users. So I think, "fromDataStream" can be
> as it is.
>
> 2.toAppendStream VS toInsertStream:
> - What is the difference between append and insert? I don't think there is
> a clear distinction between them in our daily discussions. For me, Append
> is OK.
>
> 3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused
> further misunderstandings
> - This API is also widely used, even in our test code. Although we have
> introduced DDL, our test code has not been switched.
> - exist caused further misunderstandings: Can we remove this
> misunderstanding by modifying behavior? For example, duplicate names are
> not allowed. As far as I know, a new column name is the most widely used.
>
> 4.toAppendStream(Table table, Class<T>/TypeInformation)
> - I know a AbstractDataType is more powerful, but I think a simple class
> or TypeInformation is easier to be accepted by DataStream users, simpler,
> they had a chance to not take care of datatype.
>
> I don't have a strong opinion on these, but I feel it's best not to have an
> impact on non-CDC users.
>
> Best,
> Jingsong
>
> On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <tw...@apache.org> wrote:
>
> > Thanks for the healthy discussion Jark and Dawid.
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > Yes, I'm concerned about about the per-record performance. A converter
> > or serializer should prepare an immutable Map instance before (stored in
> > a member variable) that is simply passed to every new Row instance.
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> >
> > The accumulator code in the FLIP is just an example, sure in this
> > example we could use a POJO. But in general it should also be easy for
> > DataStream API users to quickly create a Row and use names instead of
> > indices for code readability.
> >
> > I think we should not add to much validation to the setters to keep the
> > runtime overhead low.
> >
> > Users should not mix position-based and string-based setters if they
> > construct rows themselves. If they do, the result depends on the calling
> > order. IMHO this should be straight forward once the concept is clear.
> >
> > Row row = new Row(2);
> > row.setField(0, "ABC"); // always position 0
> > row.setField(1, "ABC"); // always position 1
> > row.setField("f1", "ABC"); // position 0 because first usage of "f1"
> > row.setField("f0", "ABC"); // position 1 because first usage of "f0"
> > row.setField("f1", "ABC"); // position 0 because second usage of "f1"
> >
> > Row row = new Row(2);
> > row.setField("f0", "ABC"); // position 0 because first usage of "f0"
> > row.setField(0, "ABC");    // always position 0
> >
> > Row row = new Row(2, fieldNames);
> > row.setField(0, "ABC"); // always position 0
> > row.setField("f1", "ABC"); // position defined by fieldNames
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 14:51, Jark Wu wrote:
> > > Hi Timo,
> > >
> > > Thanks for the quick response.
> > >
> > > 5. "StreamStatementSet#attachToStream()"
> > > Joining or using connect() with a different DataStream is a good case.
> > > cc @Godfrey , what do you think about the `attachToStream()` API?
> > >
> > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >> We need a Map for constant time of mapping field name to index.
> > > But we can easily build the Map from the List<String> fieldNames in Row
> > > constructor.
> > > IMO, manually building the Map and mapping names to indices is verbose
> > and
> > > error-prone.
> > > Are you concerned about the per-record performance?
> > >
> > > 7. "a Row has two modes represented by an internal boolean flag
> > > `hasFieldOrder`."
> > > Thanks for the explanation.
> > > Regarding the case (b), I have the same confusion with Dawid that
> what's
> > > the result when index-based setters and name-based setters are mixed
> used
> > > (esp. in foreach and if branches).
> > > TBH, I don't see a strong need for named setters. Using it as the UDAF
> > > accumulator is not as good as POJO in terms of performance and ease of
> > use.
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I really like the ideas of this FLIP. I think it improves user
> > >> experience quite a bit. I wanted to add just two comments:
> > >>
> > >> 1. As for the StatementSet I like the approach described in the FLIP
> for
> > >> its simplicity. Moreover the way I see it is that if a user wants to
> > >> work with DataStream, then he/she wants to end up in the DataStream
> API,
> > >> or in other words call the StreamExecutionEnvironment#execute.
> > >>
> > >> 2. @Timo What is the interaction between Row setters from the
> different
> > >> modes? What happens if the user calls both in different order. E.g.
> > >>
> > >> row.setField(0, "ABC");
> > >>
> > >> row.setField("f0", "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setField("f0", "ABC");
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> or
> > >>
> > >> row.setFieldNames(...);
> > >>
> > >> row.setField(0, "ABC"); // is this a valid call ?
> > >>
> > >> Best,
> > >>
> > >> Dawid
> > >>
> > >> On 01/09/2020 11:49, Timo Walther wrote:
> > >>> Hi Jark,
> > >>>
> > >>> thanks for the detailed review. Let me answer your concerns:
> > >>>
> > >>> ## Conversion of DataStream to Table
> > >>>
> > >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>> leaf of a QueryOperation tree in the validation phase."
> > >>> I'm fine with allowing `system_proctime` everywhere in the query.
> Also
> > >>> for SQL, I think we should have done that earlier already to give
> > >>> users the chance to have time based operations also at later stages.
> > >>>
> > >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>> assigned implicitly. "
> > >>> Yes, we just use the DataStream API watermark. `system_rowtime()`
> will
> > >>> just introduce a time attribute, the watermark travels to the Table
> > >>> API and into DataStream API without further code changes.
> > >>>
> > >>> ## Conversion of Table to DataStream
> > >>>
> > >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>> DataStream<Row>"
> > >>> 4. "Table.execute(ChangelogMode)"
> > >>> Filtering UPDATE_BEFORE is already quite important as it reduces the
> > >>> amount of data by factor 2. But I also understand your concerns
> > >>> regarding confusing users. I also got the request for a
> > >>> `Table.getChangelogMode()` a couple of times in the past, because
> > >>> users would like to get information about the kind of query that is
> > >>> executed. However, in this case `toChangelogStream(Table)` is
> > >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > >>> Table)` so we don't need `Table.getChangelogMode()` in the current
> > >>> FLIP design. But this can be future work. Let's start with
> > >>> `toChangelogStream(Table)` and wait for more feedback about this new
> > >>> feature. What do others think?
> > >>>
> > >>> ## Conversion of StatementSet to DataStream API
> > >>>
> > >>> 5. "StreamStatementSet#attachToStream()"
> > >>>
> > >>> I think Godfrey's proposal is too complex for regular users. Instead
> > >>> of continuing with the fluent programming, we would force users to
> > >>> define a DataStream pipeline in a lambda.
> > >>>
> > >>> Furthermore, joining or using connect() with a different DataStream
> > >>> source would not be possible in this design.
> > >>>
> > >>> The `execute()` method of `StatementSet` should not execute the
> > >>> DataStream API subprogram. It mixes the concepts because we tell
> > >>> users: "If you use toDataStream" you need to use
> > >>> `StreamExecutionEnvironment.execute()`.
> > >>>
> > >>> We don't solve every potential use case with the current FLIP design
> > >>> but the most important one where a pipeline just uses an INSERT INTO
> > >>> but also uses Table API for connectors and preprocessing and does the
> > >>> main logic in DataStream API:
> > >>>
> > >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
> > >>>
> > >>> I would consider `StatementSet.addDataStream(Table, ...)` future work
> > >>> for now as it is only an opimization for reusing parts of the
> > >>> StreamGraph. We could even perform this optimization when calling
> > >>> `toInsertStream` or `toChangelogStream`.
> > >>>
> > >>> ## Improve dealing with Row in DataStream API
> > >>>
> > >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>
> > >>> We need a Map for constant time of mapping field name to index.
> > >>>
> > >>> We accept a nullable `fieldNames` because names are not mandatory,
> one
> > >>> can also work with indices as before.
> > >>>
> > >>> But you are right that the fieldNames member variable can be
> > >>> immutable. I just wanted to avoid too many overloaded constructors.
> > >>> I'm fine with having one full constructor for RowKind, arity and
> field
> > >>> names (or null).
> > >>>
> > >>> 7. "a Row has two modes represented by an internal boolean flag
> > >>> `hasFieldOrder`."
> > >>> Maybe I leaked to many implementation details there that rather
> > >>> confuse readers than help. Internally, we need to distinguish between
> > >>> two kinds of rows. A user should not be bothered by this.
> > >>>
> > >>> a) Row comes from Table API runtime: hasFieldOrder = true
> > >>> Map("myAge" -> 0, "myName" -> 1)
> > >>>
> > >>> row.getField("myName") == row.getField(1)
> > >>> row.getField("myAge") == row.getField(0)
> > >>>
> > >>> b) Row comes from user: hasFieldOrder = false
> > >>> Row row = new Row(2);
> > >>> row.setField("myName", "Alice");
> > >>> row.setField("myAge", 32);
> > >>>
> > >>> Map("myAge" -> 1, "myName" -> 0)
> > >>>
> > >>> But the type information will decide about the order of the fields
> > >>> later and reorder them accordingly during serialization or RowData
> > >>> conversion:
> > >>>
> > >>> ["myName", "myAge"] vs. ["myAge", "myName"]
> > >>>
> > >>> The user must not care about this as it always feels naturally to
> deal
> > >>> with the rows.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>>
> > >>> On 01.09.20 06:19, Jark Wu wrote:
> > >>>> Hi Timo,
> > >>>>
> > >>>> Thanks a lot for the great proposal and sorry for the late reply.
> > >>>> This is
> > >>>> an important improvement for DataStream and Table API users.
> > >>>>
> > >>>> I have listed my thoughts and questions below ;-)
> > >>>>
> > >>>> ## Conversion of DataStream to Table
> > >>>>
> > >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > >>>> leaf of
> > >>>> a QueryOperation tree in the validation phase."
> > >>>> IIUC, that means `system_rowtime()` can only be used in the first
> > >>>> `select()` after `fromXxxStream()`, right?
> > >>>> However, I think `system_proctime()` shouldn't have this limitation,
> > >>>> because it doesn't rely on the underlying timestamp of StreamRecord
> > and
> > >>>>    can be generated in any stage of the query.
> > >>>>
> > >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > >>>> assigned implicitly. "
> > >>>> What watermark will be used here? Is the pre-assigned watermark in
> the
> > >>>> DataStream (so called `system_watermak()`)?
> > >>>>
> > >>>> ## Conversion of Table to DataStream
> > >>>>
> > >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > >>>> DataStream<Row>"
> > >>>> I'm not sure whether this method is useful for users. Currently, the
> > >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> > >>>> filtering UPDATE_BEFORE if possible.
> > >>>> However, if we expose this method to users, it may be confusing.
> > >>>> Users may
> > >>>> try to use this method to convert a changelog stream to an
> insert-only
> > >>>> stream by applying ChangelogMode.insertOnly(). This might be
> > misleading.
> > >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> > >>>> have
> > >>>> to know the ChangelogMode of the current Table first, and remove
> > >>>> UPDATE_BEFORE from the ChagnelogMode.
> > >>>> That means we have to support `Table.getChangelogMode()` first? But
> > >>>> `ChangelogMode` derivation requires a full optimization path on the
> > >>>> Table,
> > >>>> which seems impossible now.
> > >>>> Therefore, IMHO, we can introduce this interface in the future if
> > users
> > >>>> indeed need this. For most users, I think `toChangelogStream(Table)`
> > is
> > >>>> enough.
> > >>>>
> > >>>> 4. "Table.execute(ChangelogMode)"
> > >>>> Ditto.
> > >>>>
> > >>>> ## Conversion of StatementSet to DataStream API
> > >>>>
> > >>>> 5. "StreamStatementSet#attachToStream()"
> > >>>> I think the potential drawback is that it can't support multi-sink
> > >>>> optimization, i.e. share pipeline.
> > >>>> For example, if we have a Table `t1` (a heavy view uses join,
> > >>>> aggregate),
> > >>>> and want to sink to "mysql" using SQL and want to continue
> processing
> > >>>> using
> > >>>> DataStream in a job.
> > >>>> It's a huge waste of resources if we re-compute `t1`. It would be
> > >>>> nice if
> > >>>> we can come up with a solution to share the pipeline.
> > >>>>
> > >>>> I borrowed Godfrey's idea in FLINK-18840 and added some
> > >>>> modifications. What
> > >>>> do you think about the following proposal?
> > >>>>
> > >>>> interface StatementSet {
> > >>>>      StatementSet addDataStream(Table table,
> TableDataStreamTransform
> > >>>> transform);
> > >>>> }
> > >>>>
> > >>>> interface TableDataStreamTransform {
> > >>>>      void transform(Context);
> > >>>>
> > >>>>      interface Context {
> > >>>>          Table getTable();
> > >>>>          DataStream<Row> toInsertStream(Table);
> > >>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> > >>>>          DataStream<Row> toChangelogStream(Table);
> > >>>>      }
> > >>>> }
> > >>>>
> > >>>> tEnv
> > >>>>     .createStatementSet()
> > >>>>     .addInsert("mysql", table1)
> > >>>>     .addDataStream(table1, ctx -> {
> > >>>>         ctx.toInsertStream(ctx.getTable())
> > >>>>           .flatmap(..)
> > >>>>           .keyBy(..)
> > >>>>           .process(..)
> > >>>>           .addSink(...);
> > >>>>     })
> > >>>>
> > >>>>
> > >>>> ## Improve dealing with Row in DataStream API
> > >>>>
> > >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> > >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter
> > is
> > >>>> enough and more handy than Map ?
> > >>>> - Currently, the fieldNames member variable is mutable, is it on
> > >>>> purpose?
> > >>>> Can we make it immutable? For example, only accept from the
> > constructor.
> > >>>> - Why do we accept a nullable `fieldNames`?
> > >>>>
> > >>>> 7. "a Row has two modes represented by an internal boolean flag
> > >>>> `hasFieldOrder`."
> > >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> > >>>> and is
> > >>>> used for. Could you explain a bit more for this?
> > >>>>
> > >>>> Best,
> > >>>> Jark
> > >>>>
> > >>>>
> > >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org>
> > wrote:
> > >>>>
> > >>>>> Hi David,
> > >>>>>
> > >>>>> thanks for your feedback. Feedback from someone who interacts with
> > many
> > >>>>> users is very valuable. I added an explanation for StatementSets to
> > the
> > >>>>> FLIP.
> > >>>>>
> > >>>>> Regarding watermarks and fromInsertStream, actually the
> > >>>>>
> > >>>>> `Schema.watermark("ts", system_watermark())`
> > >>>>>
> > >>>>> is not really necessary in the `fromChangelogStream`. It is added
> to
> > >>>>> satify the Schema interface and be similar to SQL DDL.
> > >>>>>
> > >>>>> We could already extract the watermark strategy if we see
> > >>>>> `system_rowtime()` because in most of the cases we will simply use
> > the
> > >>>>> DataStream API watermarks.
> > >>>>>
> > >>>>> But maybe some users want to generate watermarks after
> preprocessing
> > in
> > >>>>> DataStream API. In this cases users what to define a computed
> > watermark
> > >>>>> expression.
> > >>>>>
> > >>>>> So for simplicity in the Simple API we introduce:
> > >>>>>
> > >>>>> tEnv
> > >>>>>      .fromInsertStream(DataStream<T>)
> > >>>>>      .select('*, system_rowtime().as("rowtime"),
> > >>>>> system_proctime().as("proctime"))
> > >>>>>
> > >>>>> and just rely on the watermarks that travel through DataStream API
> > >>>>> already. I added another comment to the FLIP.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Timo
> > >>>>>
> > >>>>>
> > >>>>> On 19.08.20 10:53, David Anderson wrote:
> > >>>>>> Timo, nice to see this.
> > >>>>>>
> > >>>>>> As someone who expects to use these interfaces, but who doesn't
> > fully
> > >>>>>> understand the existing Table API, I like what I see. Just a
> couple
> > of
> > >>>>>> comments:
> > >>>>>>
> > >>>>>> The way that watermarks fit into the fromChangelogStream case
> makes
> > >>>>>> sense
> > >>>>>> to me, and I'm wondering why watermarks don't come up in the
> > previous
> > >>>>>> section about fromInsertStream.
> > >>>>>>
> > >>>>>> I wasn't familiar with StatementSets, and I couldn't find an
> > >>>>>> explanation
> > >>>>> in
> > >>>>>> the docs. I eventually found this short paragraph in an email from
> > >>>>>> Fabian
> > >>>>>> Hueske, which clarified everything in that section for me:
> > >>>>>>
> > >>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
> > >>>>>> multiple
> > >>>>>> INSERT
> > >>>>>>        INTO statements (SQL or Table API) together. The statements
> > in a
> > >>>>>> statement
> > >>>>>>        set are jointly optimized and executed as a single Flink
> job.
> > >>>>>>
> > >>>>>> Maybe if you add this to the FLIP it will help other readers as
> > well.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> David
> > >>>>>>
> > >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twalthr@apache.org
> >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> I would like to propose a FLIP that aims to resolve the remaining
> > >>>>>>> shortcomings in the Table API:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > >>>>>
> > >>>>>>>
> > >>>>>>> The Table API has received many new features over the last year.
> It
> > >>>>>>> supports a new type system (FLIP-37), connectors support
> changelogs
> > >>>>>>> (FLIP-95), we have well defined internal data structures
> (FLIP-95),
> > >>>>>>> support for result retrieval in an interactive fashion (FLIP-84),
> > and
> > >>>>>>> soon new TableDescriptors (FLIP-129).
> > >>>>>>>
> > >>>>>>> However, the interfaces from and to DataStream API have not been
> > >>>>>>> touched
> > >>>>>>> during the introduction of these new features and are kind of
> > >>>>>>> outdated.
> > >>>>>>> The interfaces lack important functionality that is available in
> > >>>>>>> Table
> > >>>>>>> API but not exposed to DataStream API users. DataStream API is
> > >>>>>>> still our
> > >>>>>>> most important API which is why a good interoperability is
> crucial.
> > >>>>>>>
> > >>>>>>> This FLIP is a mixture of different topics that improve the
> > >>>>>>> interoperability between DataStream and Table API in terms of:
> > >>>>>>>
> > >>>>>>> - DataStream <-> Table conversion
> > >>>>>>> - translation of type systems TypeInformation <-> DataType
> > >>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
> > >>>>>>> - changelog handling
> > >>>>>>> - row handling in DataStream API
> > >>>>>>>
> > >>>>>>> I'm looking forward to your feedback.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
> --
> Best, Jingsong Lee
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Timo for driving.

My first impression is, can we not deprecate these API?
- StreamTableEnvironment.fromDataStream(DataStream<T>): Table
- StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...):
Table
- StreamTableEnvironment.createTemporaryView(String, DataStream<T>,
Expression...): Unit
- StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit
- StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz):
DataStream<T>
- StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T>
typeInfo): DataStream<T>

I think they may also be commonly used APIs. My intuitive feeling is that
the API of the table layer is changing too fast, and there are a lot of
changes in each version. Even if there is a "deprecated", they will be
removed one day. We can avoid the change unless there's a strong reason.

1.fromDataStream VS fromInsertStream:
- In big data systems, or in our previous designs, APIs, including DDL, the
default is pure insert. Subsequent CDC, upsert, and delete are all
supplementary extension capabilities. Therefore, by default, it is insert,
which is known and familiar to users. So I think, "fromDataStream" can be
as it is.

2.toAppendStream VS toInsertStream:
- What is the difference between append and insert? I don't think there is
a clear distinction between them in our daily discussions. For me, Append
is OK.

3.Calling `.rowtime()` and `.proctime()` on fields that don't exist caused
further misunderstandings
- This API is also widely used, even in our test code. Although we have
introduced DDL, our test code has not been switched.
- exist caused further misunderstandings: Can we remove this
misunderstanding by modifying behavior? For example, duplicate names are
not allowed. As far as I know, a new column name is the most widely used.

4.toAppendStream(Table table, Class<T>/TypeInformation)
- I know a AbstractDataType is more powerful, but I think a simple class
or TypeInformation is easier to be accepted by DataStream users, simpler,
they had a chance to not take care of datatype.

I don't have a strong opinion on these, but I feel it's best not to have an
impact on non-CDC users.

Best,
Jingsong

On Tue, Sep 1, 2020 at 9:10 PM Timo Walther <tw...@apache.org> wrote:

> Thanks for the healthy discussion Jark and Dawid.
>
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>
> Yes, I'm concerned about about the per-record performance. A converter
> or serializer should prepare an immutable Map instance before (stored in
> a member variable) that is simply passed to every new Row instance.
>
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
>
> The accumulator code in the FLIP is just an example, sure in this
> example we could use a POJO. But in general it should also be easy for
> DataStream API users to quickly create a Row and use names instead of
> indices for code readability.
>
> I think we should not add to much validation to the setters to keep the
> runtime overhead low.
>
> Users should not mix position-based and string-based setters if they
> construct rows themselves. If they do, the result depends on the calling
> order. IMHO this should be straight forward once the concept is clear.
>
> Row row = new Row(2);
> row.setField(0, "ABC"); // always position 0
> row.setField(1, "ABC"); // always position 1
> row.setField("f1", "ABC"); // position 0 because first usage of "f1"
> row.setField("f0", "ABC"); // position 1 because first usage of "f0"
> row.setField("f1", "ABC"); // position 0 because second usage of "f1"
>
> Row row = new Row(2);
> row.setField("f0", "ABC"); // position 0 because first usage of "f0"
> row.setField(0, "ABC");    // always position 0
>
> Row row = new Row(2, fieldNames);
> row.setField(0, "ABC"); // always position 0
> row.setField("f1", "ABC"); // position defined by fieldNames
>
> Regards,
> Timo
>
> On 01.09.20 14:51, Jark Wu wrote:
> > Hi Timo,
> >
> > Thanks for the quick response.
> >
> > 5. "StreamStatementSet#attachToStream()"
> > Joining or using connect() with a different DataStream is a good case.
> > cc @Godfrey , what do you think about the `attachToStream()` API?
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >> We need a Map for constant time of mapping field name to index.
> > But we can easily build the Map from the List<String> fieldNames in Row
> > constructor.
> > IMO, manually building the Map and mapping names to indices is verbose
> and
> > error-prone.
> > Are you concerned about the per-record performance?
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> > Thanks for the explanation.
> > Regarding the case (b), I have the same confusion with Dawid that what's
> > the result when index-based setters and name-based setters are mixed used
> > (esp. in foreach and if branches).
> > TBH, I don't see a strong need for named setters. Using it as the UDAF
> > accumulator is not as good as POJO in terms of performance and ease of
> use.
> >
> > Best,
> > Jark
> >
> > On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> > wrote:
> >
> >> Hi all,
> >>
> >> I really like the ideas of this FLIP. I think it improves user
> >> experience quite a bit. I wanted to add just two comments:
> >>
> >> 1. As for the StatementSet I like the approach described in the FLIP for
> >> its simplicity. Moreover the way I see it is that if a user wants to
> >> work with DataStream, then he/she wants to end up in the DataStream API,
> >> or in other words call the StreamExecutionEnvironment#execute.
> >>
> >> 2. @Timo What is the interaction between Row setters from the different
> >> modes? What happens if the user calls both in different order. E.g.
> >>
> >> row.setField(0, "ABC");
> >>
> >> row.setField("f0", "ABC"); // is this a valid call ?
> >>
> >> or
> >>
> >> row.setField("f0", "ABC");
> >>
> >> row.setField(0, "ABC"); // is this a valid call ?
> >>
> >> or
> >>
> >> row.setFieldNames(...);
> >>
> >> row.setField(0, "ABC"); // is this a valid call ?
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 01/09/2020 11:49, Timo Walther wrote:
> >>> Hi Jark,
> >>>
> >>> thanks for the detailed review. Let me answer your concerns:
> >>>
> >>> ## Conversion of DataStream to Table
> >>>
> >>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >>> leaf of a QueryOperation tree in the validation phase."
> >>> I'm fine with allowing `system_proctime` everywhere in the query. Also
> >>> for SQL, I think we should have done that earlier already to give
> >>> users the chance to have time based operations also at later stages.
> >>>
> >>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >>> assigned implicitly. "
> >>> Yes, we just use the DataStream API watermark. `system_rowtime()` will
> >>> just introduce a time attribute, the watermark travels to the Table
> >>> API and into DataStream API without further code changes.
> >>>
> >>> ## Conversion of Table to DataStream
> >>>
> >>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >>> DataStream<Row>"
> >>> 4. "Table.execute(ChangelogMode)"
> >>> Filtering UPDATE_BEFORE is already quite important as it reduces the
> >>> amount of data by factor 2. But I also understand your concerns
> >>> regarding confusing users. I also got the request for a
> >>> `Table.getChangelogMode()` a couple of times in the past, because
> >>> users would like to get information about the kind of query that is
> >>> executed. However, in this case `toChangelogStream(Table)` is
> >>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> >>> Table)` so we don't need `Table.getChangelogMode()` in the current
> >>> FLIP design. But this can be future work. Let's start with
> >>> `toChangelogStream(Table)` and wait for more feedback about this new
> >>> feature. What do others think?
> >>>
> >>> ## Conversion of StatementSet to DataStream API
> >>>
> >>> 5. "StreamStatementSet#attachToStream()"
> >>>
> >>> I think Godfrey's proposal is too complex for regular users. Instead
> >>> of continuing with the fluent programming, we would force users to
> >>> define a DataStream pipeline in a lambda.
> >>>
> >>> Furthermore, joining or using connect() with a different DataStream
> >>> source would not be possible in this design.
> >>>
> >>> The `execute()` method of `StatementSet` should not execute the
> >>> DataStream API subprogram. It mixes the concepts because we tell
> >>> users: "If you use toDataStream" you need to use
> >>> `StreamExecutionEnvironment.execute()`.
> >>>
> >>> We don't solve every potential use case with the current FLIP design
> >>> but the most important one where a pipeline just uses an INSERT INTO
> >>> but also uses Table API for connectors and preprocessing and does the
> >>> main logic in DataStream API:
> >>>
> >>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
> >>>
> >>> I would consider `StatementSet.addDataStream(Table, ...)` future work
> >>> for now as it is only an opimization for reusing parts of the
> >>> StreamGraph. We could even perform this optimization when calling
> >>> `toInsertStream` or `toChangelogStream`.
> >>>
> >>> ## Improve dealing with Row in DataStream API
> >>>
> >>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >>>
> >>> We need a Map for constant time of mapping field name to index.
> >>>
> >>> We accept a nullable `fieldNames` because names are not mandatory, one
> >>> can also work with indices as before.
> >>>
> >>> But you are right that the fieldNames member variable can be
> >>> immutable. I just wanted to avoid too many overloaded constructors.
> >>> I'm fine with having one full constructor for RowKind, arity and field
> >>> names (or null).
> >>>
> >>> 7. "a Row has two modes represented by an internal boolean flag
> >>> `hasFieldOrder`."
> >>> Maybe I leaked to many implementation details there that rather
> >>> confuse readers than help. Internally, we need to distinguish between
> >>> two kinds of rows. A user should not be bothered by this.
> >>>
> >>> a) Row comes from Table API runtime: hasFieldOrder = true
> >>> Map("myAge" -> 0, "myName" -> 1)
> >>>
> >>> row.getField("myName") == row.getField(1)
> >>> row.getField("myAge") == row.getField(0)
> >>>
> >>> b) Row comes from user: hasFieldOrder = false
> >>> Row row = new Row(2);
> >>> row.setField("myName", "Alice");
> >>> row.setField("myAge", 32);
> >>>
> >>> Map("myAge" -> 1, "myName" -> 0)
> >>>
> >>> But the type information will decide about the order of the fields
> >>> later and reorder them accordingly during serialization or RowData
> >>> conversion:
> >>>
> >>> ["myName", "myAge"] vs. ["myAge", "myName"]
> >>>
> >>> The user must not care about this as it always feels naturally to deal
> >>> with the rows.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 01.09.20 06:19, Jark Wu wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Thanks a lot for the great proposal and sorry for the late reply.
> >>>> This is
> >>>> an important improvement for DataStream and Table API users.
> >>>>
> >>>> I have listed my thoughts and questions below ;-)
> >>>>
> >>>> ## Conversion of DataStream to Table
> >>>>
> >>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >>>> leaf of
> >>>> a QueryOperation tree in the validation phase."
> >>>> IIUC, that means `system_rowtime()` can only be used in the first
> >>>> `select()` after `fromXxxStream()`, right?
> >>>> However, I think `system_proctime()` shouldn't have this limitation,
> >>>> because it doesn't rely on the underlying timestamp of StreamRecord
> and
> >>>>    can be generated in any stage of the query.
> >>>>
> >>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >>>> assigned implicitly. "
> >>>> What watermark will be used here? Is the pre-assigned watermark in the
> >>>> DataStream (so called `system_watermak()`)?
> >>>>
> >>>> ## Conversion of Table to DataStream
> >>>>
> >>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >>>> DataStream<Row>"
> >>>> I'm not sure whether this method is useful for users. Currently, the
> >>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> >>>> filtering UPDATE_BEFORE if possible.
> >>>> However, if we expose this method to users, it may be confusing.
> >>>> Users may
> >>>> try to use this method to convert a changelog stream to an insert-only
> >>>> stream by applying ChangelogMode.insertOnly(). This might be
> misleading.
> >>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> >>>> have
> >>>> to know the ChangelogMode of the current Table first, and remove
> >>>> UPDATE_BEFORE from the ChagnelogMode.
> >>>> That means we have to support `Table.getChangelogMode()` first? But
> >>>> `ChangelogMode` derivation requires a full optimization path on the
> >>>> Table,
> >>>> which seems impossible now.
> >>>> Therefore, IMHO, we can introduce this interface in the future if
> users
> >>>> indeed need this. For most users, I think `toChangelogStream(Table)`
> is
> >>>> enough.
> >>>>
> >>>> 4. "Table.execute(ChangelogMode)"
> >>>> Ditto.
> >>>>
> >>>> ## Conversion of StatementSet to DataStream API
> >>>>
> >>>> 5. "StreamStatementSet#attachToStream()"
> >>>> I think the potential drawback is that it can't support multi-sink
> >>>> optimization, i.e. share pipeline.
> >>>> For example, if we have a Table `t1` (a heavy view uses join,
> >>>> aggregate),
> >>>> and want to sink to "mysql" using SQL and want to continue processing
> >>>> using
> >>>> DataStream in a job.
> >>>> It's a huge waste of resources if we re-compute `t1`. It would be
> >>>> nice if
> >>>> we can come up with a solution to share the pipeline.
> >>>>
> >>>> I borrowed Godfrey's idea in FLINK-18840 and added some
> >>>> modifications. What
> >>>> do you think about the following proposal?
> >>>>
> >>>> interface StatementSet {
> >>>>      StatementSet addDataStream(Table table, TableDataStreamTransform
> >>>> transform);
> >>>> }
> >>>>
> >>>> interface TableDataStreamTransform {
> >>>>      void transform(Context);
> >>>>
> >>>>      interface Context {
> >>>>          Table getTable();
> >>>>          DataStream<Row> toInsertStream(Table);
> >>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> >>>>          DataStream<Row> toChangelogStream(Table);
> >>>>      }
> >>>> }
> >>>>
> >>>> tEnv
> >>>>     .createStatementSet()
> >>>>     .addInsert("mysql", table1)
> >>>>     .addDataStream(table1, ctx -> {
> >>>>         ctx.toInsertStream(ctx.getTable())
> >>>>           .flatmap(..)
> >>>>           .keyBy(..)
> >>>>           .process(..)
> >>>>           .addSink(...);
> >>>>     })
> >>>>
> >>>>
> >>>> ## Improve dealing with Row in DataStream API
> >>>>
> >>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter
> is
> >>>> enough and more handy than Map ?
> >>>> - Currently, the fieldNames member variable is mutable, is it on
> >>>> purpose?
> >>>> Can we make it immutable? For example, only accept from the
> constructor.
> >>>> - Why do we accept a nullable `fieldNames`?
> >>>>
> >>>> 7. "a Row has two modes represented by an internal boolean flag
> >>>> `hasFieldOrder`."
> >>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> >>>> and is
> >>>> used for. Could you explain a bit more for this?
> >>>>
> >>>> Best,
> >>>> Jark
> >>>>
> >>>>
> >>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org>
> wrote:
> >>>>
> >>>>> Hi David,
> >>>>>
> >>>>> thanks for your feedback. Feedback from someone who interacts with
> many
> >>>>> users is very valuable. I added an explanation for StatementSets to
> the
> >>>>> FLIP.
> >>>>>
> >>>>> Regarding watermarks and fromInsertStream, actually the
> >>>>>
> >>>>> `Schema.watermark("ts", system_watermark())`
> >>>>>
> >>>>> is not really necessary in the `fromChangelogStream`. It is added to
> >>>>> satify the Schema interface and be similar to SQL DDL.
> >>>>>
> >>>>> We could already extract the watermark strategy if we see
> >>>>> `system_rowtime()` because in most of the cases we will simply use
> the
> >>>>> DataStream API watermarks.
> >>>>>
> >>>>> But maybe some users want to generate watermarks after preprocessing
> in
> >>>>> DataStream API. In this cases users what to define a computed
> watermark
> >>>>> expression.
> >>>>>
> >>>>> So for simplicity in the Simple API we introduce:
> >>>>>
> >>>>> tEnv
> >>>>>      .fromInsertStream(DataStream<T>)
> >>>>>      .select('*, system_rowtime().as("rowtime"),
> >>>>> system_proctime().as("proctime"))
> >>>>>
> >>>>> and just rely on the watermarks that travel through DataStream API
> >>>>> already. I added another comment to the FLIP.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 19.08.20 10:53, David Anderson wrote:
> >>>>>> Timo, nice to see this.
> >>>>>>
> >>>>>> As someone who expects to use these interfaces, but who doesn't
> fully
> >>>>>> understand the existing Table API, I like what I see. Just a couple
> of
> >>>>>> comments:
> >>>>>>
> >>>>>> The way that watermarks fit into the fromChangelogStream case makes
> >>>>>> sense
> >>>>>> to me, and I'm wondering why watermarks don't come up in the
> previous
> >>>>>> section about fromInsertStream.
> >>>>>>
> >>>>>> I wasn't familiar with StatementSets, and I couldn't find an
> >>>>>> explanation
> >>>>> in
> >>>>>> the docs. I eventually found this short paragraph in an email from
> >>>>>> Fabian
> >>>>>> Hueske, which clarified everything in that section for me:
> >>>>>>
> >>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
> >>>>>> multiple
> >>>>>> INSERT
> >>>>>>        INTO statements (SQL or Table API) together. The statements
> in a
> >>>>>> statement
> >>>>>>        set are jointly optimized and executed as a single Flink job.
> >>>>>>
> >>>>>> Maybe if you add this to the FLIP it will help other readers as
> well.
> >>>>>>
> >>>>>> Best,
> >>>>>> David
> >>>>>>
> >>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> I would like to propose a FLIP that aims to resolve the remaining
> >>>>>>> shortcomings in the Table API:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>
> >>>>>>>
> >>>>>>> The Table API has received many new features over the last year. It
> >>>>>>> supports a new type system (FLIP-37), connectors support changelogs
> >>>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>>>>>> support for result retrieval in an interactive fashion (FLIP-84),
> and
> >>>>>>> soon new TableDescriptors (FLIP-129).
> >>>>>>>
> >>>>>>> However, the interfaces from and to DataStream API have not been
> >>>>>>> touched
> >>>>>>> during the introduction of these new features and are kind of
> >>>>>>> outdated.
> >>>>>>> The interfaces lack important functionality that is available in
> >>>>>>> Table
> >>>>>>> API but not exposed to DataStream API users. DataStream API is
> >>>>>>> still our
> >>>>>>> most important API which is why a good interoperability is crucial.
> >>>>>>>
> >>>>>>> This FLIP is a mixture of different topics that improve the
> >>>>>>> interoperability between DataStream and Table API in terms of:
> >>>>>>>
> >>>>>>> - DataStream <-> Table conversion
> >>>>>>> - translation of type systems TypeInformation <-> DataType
> >>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
> >>>>>>> - changelog handling
> >>>>>>> - row handling in DataStream API
> >>>>>>>
> >>>>>>> I'm looking forward to your feedback.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Thanks for the healthy discussion Jark and Dawid.

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"

Yes, I'm concerned about about the per-record performance. A converter 
or serializer should prepare an immutable Map instance before (stored in 
a member variable) that is simply passed to every new Row instance.

7. "a Row has two modes represented by an internal boolean flag 
`hasFieldOrder`."

The accumulator code in the FLIP is just an example, sure in this 
example we could use a POJO. But in general it should also be easy for 
DataStream API users to quickly create a Row and use names instead of 
indices for code readability.

I think we should not add to much validation to the setters to keep the 
runtime overhead low.

Users should not mix position-based and string-based setters if they 
construct rows themselves. If they do, the result depends on the calling 
order. IMHO this should be straight forward once the concept is clear.

Row row = new Row(2);
row.setField(0, "ABC"); // always position 0
row.setField(1, "ABC"); // always position 1
row.setField("f1", "ABC"); // position 0 because first usage of "f1"
row.setField("f0", "ABC"); // position 1 because first usage of "f0"
row.setField("f1", "ABC"); // position 0 because second usage of "f1"

Row row = new Row(2);
row.setField("f0", "ABC"); // position 0 because first usage of "f0"
row.setField(0, "ABC");    // always position 0

Row row = new Row(2, fieldNames);
row.setField(0, "ABC"); // always position 0
row.setField("f1", "ABC"); // position defined by fieldNames

Regards,
Timo

On 01.09.20 14:51, Jark Wu wrote:
> Hi Timo,
> 
> Thanks for the quick response.
> 
> 5. "StreamStatementSet#attachToStream()"
> Joining or using connect() with a different DataStream is a good case.
> cc @Godfrey , what do you think about the `attachToStream()` API?
> 
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>> We need a Map for constant time of mapping field name to index.
> But we can easily build the Map from the List<String> fieldNames in Row
> constructor.
> IMO, manually building the Map and mapping names to indices is verbose and
> error-prone.
> Are you concerned about the per-record performance?
> 
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Thanks for the explanation.
> Regarding the case (b), I have the same confusion with Dawid that what's
> the result when index-based setters and name-based setters are mixed used
> (esp. in foreach and if branches).
> TBH, I don't see a strong need for named setters. Using it as the UDAF
> accumulator is not as good as POJO in terms of performance and ease of use.
> 
> Best,
> Jark
> 
> On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
> wrote:
> 
>> Hi all,
>>
>> I really like the ideas of this FLIP. I think it improves user
>> experience quite a bit. I wanted to add just two comments:
>>
>> 1. As for the StatementSet I like the approach described in the FLIP for
>> its simplicity. Moreover the way I see it is that if a user wants to
>> work with DataStream, then he/she wants to end up in the DataStream API,
>> or in other words call the StreamExecutionEnvironment#execute.
>>
>> 2. @Timo What is the interaction between Row setters from the different
>> modes? What happens if the user calls both in different order. E.g.
>>
>> row.setField(0, "ABC");
>>
>> row.setField("f0", "ABC"); // is this a valid call ?
>>
>> or
>>
>> row.setField("f0", "ABC");
>>
>> row.setField(0, "ABC"); // is this a valid call ?
>>
>> or
>>
>> row.setFieldNames(...);
>>
>> row.setField(0, "ABC"); // is this a valid call ?
>>
>> Best,
>>
>> Dawid
>>
>> On 01/09/2020 11:49, Timo Walther wrote:
>>> Hi Jark,
>>>
>>> thanks for the detailed review. Let me answer your concerns:
>>>
>>> ## Conversion of DataStream to Table
>>>
>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>>> leaf of a QueryOperation tree in the validation phase."
>>> I'm fine with allowing `system_proctime` everywhere in the query. Also
>>> for SQL, I think we should have done that earlier already to give
>>> users the chance to have time based operations also at later stages.
>>>
>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>>> assigned implicitly. "
>>> Yes, we just use the DataStream API watermark. `system_rowtime()` will
>>> just introduce a time attribute, the watermark travels to the Table
>>> API and into DataStream API without further code changes.
>>>
>>> ## Conversion of Table to DataStream
>>>
>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>>> DataStream<Row>"
>>> 4. "Table.execute(ChangelogMode)"
>>> Filtering UPDATE_BEFORE is already quite important as it reduces the
>>> amount of data by factor 2. But I also understand your concerns
>>> regarding confusing users. I also got the request for a
>>> `Table.getChangelogMode()` a couple of times in the past, because
>>> users would like to get information about the kind of query that is
>>> executed. However, in this case `toChangelogStream(Table)` is
>>> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
>>> Table)` so we don't need `Table.getChangelogMode()` in the current
>>> FLIP design. But this can be future work. Let's start with
>>> `toChangelogStream(Table)` and wait for more feedback about this new
>>> feature. What do others think?
>>>
>>> ## Conversion of StatementSet to DataStream API
>>>
>>> 5. "StreamStatementSet#attachToStream()"
>>>
>>> I think Godfrey's proposal is too complex for regular users. Instead
>>> of continuing with the fluent programming, we would force users to
>>> define a DataStream pipeline in a lambda.
>>>
>>> Furthermore, joining or using connect() with a different DataStream
>>> source would not be possible in this design.
>>>
>>> The `execute()` method of `StatementSet` should not execute the
>>> DataStream API subprogram. It mixes the concepts because we tell
>>> users: "If you use toDataStream" you need to use
>>> `StreamExecutionEnvironment.execute()`.
>>>
>>> We don't solve every potential use case with the current FLIP design
>>> but the most important one where a pipeline just uses an INSERT INTO
>>> but also uses Table API for connectors and preprocessing and does the
>>> main logic in DataStream API:
>>>
>>> T1 -> T2, T3 -> DataStream, T4 -> DataStream
>>>
>>> I would consider `StatementSet.addDataStream(Table, ...)` future work
>>> for now as it is only an opimization for reusing parts of the
>>> StreamGraph. We could even perform this optimization when calling
>>> `toInsertStream` or `toChangelogStream`.
>>>
>>> ## Improve dealing with Row in DataStream API
>>>
>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>>>
>>> We need a Map for constant time of mapping field name to index.
>>>
>>> We accept a nullable `fieldNames` because names are not mandatory, one
>>> can also work with indices as before.
>>>
>>> But you are right that the fieldNames member variable can be
>>> immutable. I just wanted to avoid too many overloaded constructors.
>>> I'm fine with having one full constructor for RowKind, arity and field
>>> names (or null).
>>>
>>> 7. "a Row has two modes represented by an internal boolean flag
>>> `hasFieldOrder`."
>>> Maybe I leaked to many implementation details there that rather
>>> confuse readers than help. Internally, we need to distinguish between
>>> two kinds of rows. A user should not be bothered by this.
>>>
>>> a) Row comes from Table API runtime: hasFieldOrder = true
>>> Map("myAge" -> 0, "myName" -> 1)
>>>
>>> row.getField("myName") == row.getField(1)
>>> row.getField("myAge") == row.getField(0)
>>>
>>> b) Row comes from user: hasFieldOrder = false
>>> Row row = new Row(2);
>>> row.setField("myName", "Alice");
>>> row.setField("myAge", 32);
>>>
>>> Map("myAge" -> 1, "myName" -> 0)
>>>
>>> But the type information will decide about the order of the fields
>>> later and reorder them accordingly during serialization or RowData
>>> conversion:
>>>
>>> ["myName", "myAge"] vs. ["myAge", "myName"]
>>>
>>> The user must not care about this as it always feels naturally to deal
>>> with the rows.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 01.09.20 06:19, Jark Wu wrote:
>>>> Hi Timo,
>>>>
>>>> Thanks a lot for the great proposal and sorry for the late reply.
>>>> This is
>>>> an important improvement for DataStream and Table API users.
>>>>
>>>> I have listed my thoughts and questions below ;-)
>>>>
>>>> ## Conversion of DataStream to Table
>>>>
>>>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>>>> leaf of
>>>> a QueryOperation tree in the validation phase."
>>>> IIUC, that means `system_rowtime()` can only be used in the first
>>>> `select()` after `fromXxxStream()`, right?
>>>> However, I think `system_proctime()` shouldn't have this limitation,
>>>> because it doesn't rely on the underlying timestamp of StreamRecord and
>>>>    can be generated in any stage of the query.
>>>>
>>>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>>>> assigned implicitly. "
>>>> What watermark will be used here? Is the pre-assigned watermark in the
>>>> DataStream (so called `system_watermak()`)?
>>>>
>>>> ## Conversion of Table to DataStream
>>>>
>>>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>>>> DataStream<Row>"
>>>> I'm not sure whether this method is useful for users. Currently, the
>>>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
>>>> filtering UPDATE_BEFORE if possible.
>>>> However, if we expose this method to users, it may be confusing.
>>>> Users may
>>>> try to use this method to convert a changelog stream to an insert-only
>>>> stream by applying ChangelogMode.insertOnly(). This might be misleading.
>>>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
>>>> have
>>>> to know the ChangelogMode of the current Table first, and remove
>>>> UPDATE_BEFORE from the ChagnelogMode.
>>>> That means we have to support `Table.getChangelogMode()` first? But
>>>> `ChangelogMode` derivation requires a full optimization path on the
>>>> Table,
>>>> which seems impossible now.
>>>> Therefore, IMHO, we can introduce this interface in the future if users
>>>> indeed need this. For most users, I think `toChangelogStream(Table)` is
>>>> enough.
>>>>
>>>> 4. "Table.execute(ChangelogMode)"
>>>> Ditto.
>>>>
>>>> ## Conversion of StatementSet to DataStream API
>>>>
>>>> 5. "StreamStatementSet#attachToStream()"
>>>> I think the potential drawback is that it can't support multi-sink
>>>> optimization, i.e. share pipeline.
>>>> For example, if we have a Table `t1` (a heavy view uses join,
>>>> aggregate),
>>>> and want to sink to "mysql" using SQL and want to continue processing
>>>> using
>>>> DataStream in a job.
>>>> It's a huge waste of resources if we re-compute `t1`. It would be
>>>> nice if
>>>> we can come up with a solution to share the pipeline.
>>>>
>>>> I borrowed Godfrey's idea in FLINK-18840 and added some
>>>> modifications. What
>>>> do you think about the following proposal?
>>>>
>>>> interface StatementSet {
>>>>      StatementSet addDataStream(Table table, TableDataStreamTransform
>>>> transform);
>>>> }
>>>>
>>>> interface TableDataStreamTransform {
>>>>      void transform(Context);
>>>>
>>>>      interface Context {
>>>>          Table getTable();
>>>>          DataStream<Row> toInsertStream(Table);
>>>>          DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>>>>          DataStream<Row> toChangelogStream(Table);
>>>>      }
>>>> }
>>>>
>>>> tEnv
>>>>     .createStatementSet()
>>>>     .addInsert("mysql", table1)
>>>>     .addDataStream(table1, ctx -> {
>>>>         ctx.toInsertStream(ctx.getTable())
>>>>           .flatmap(..)
>>>>           .keyBy(..)
>>>>           .process(..)
>>>>           .addSink(...);
>>>>     })
>>>>
>>>>
>>>> ## Improve dealing with Row in DataStream API
>>>>
>>>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>>>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
>>>> enough and more handy than Map ?
>>>> - Currently, the fieldNames member variable is mutable, is it on
>>>> purpose?
>>>> Can we make it immutable? For example, only accept from the constructor.
>>>> - Why do we accept a nullable `fieldNames`?
>>>>
>>>> 7. "a Row has two modes represented by an internal boolean flag
>>>> `hasFieldOrder`."
>>>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
>>>> and is
>>>> used for. Could you explain a bit more for this?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> thanks for your feedback. Feedback from someone who interacts with many
>>>>> users is very valuable. I added an explanation for StatementSets to the
>>>>> FLIP.
>>>>>
>>>>> Regarding watermarks and fromInsertStream, actually the
>>>>>
>>>>> `Schema.watermark("ts", system_watermark())`
>>>>>
>>>>> is not really necessary in the `fromChangelogStream`. It is added to
>>>>> satify the Schema interface and be similar to SQL DDL.
>>>>>
>>>>> We could already extract the watermark strategy if we see
>>>>> `system_rowtime()` because in most of the cases we will simply use the
>>>>> DataStream API watermarks.
>>>>>
>>>>> But maybe some users want to generate watermarks after preprocessing in
>>>>> DataStream API. In this cases users what to define a computed watermark
>>>>> expression.
>>>>>
>>>>> So for simplicity in the Simple API we introduce:
>>>>>
>>>>> tEnv
>>>>>      .fromInsertStream(DataStream<T>)
>>>>>      .select('*, system_rowtime().as("rowtime"),
>>>>> system_proctime().as("proctime"))
>>>>>
>>>>> and just rely on the watermarks that travel through DataStream API
>>>>> already. I added another comment to the FLIP.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 19.08.20 10:53, David Anderson wrote:
>>>>>> Timo, nice to see this.
>>>>>>
>>>>>> As someone who expects to use these interfaces, but who doesn't fully
>>>>>> understand the existing Table API, I like what I see. Just a couple of
>>>>>> comments:
>>>>>>
>>>>>> The way that watermarks fit into the fromChangelogStream case makes
>>>>>> sense
>>>>>> to me, and I'm wondering why watermarks don't come up in the previous
>>>>>> section about fromInsertStream.
>>>>>>
>>>>>> I wasn't familiar with StatementSets, and I couldn't find an
>>>>>> explanation
>>>>> in
>>>>>> the docs. I eventually found this short paragraph in an email from
>>>>>> Fabian
>>>>>> Hueske, which clarified everything in that section for me:
>>>>>>
>>>>>>        FLIP-84 [1] added the concept of a "statement set" to group
>>>>>> multiple
>>>>>> INSERT
>>>>>>        INTO statements (SQL or Table API) together. The statements in a
>>>>>> statement
>>>>>>        set are jointly optimized and executed as a single Flink job.
>>>>>>
>>>>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>>>>
>>>>>> Best,
>>>>>> David
>>>>>>
>>>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>>>> shortcomings in the Table API:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>
>>>>>>>
>>>>>>> The Table API has received many new features over the last year. It
>>>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>>>> soon new TableDescriptors (FLIP-129).
>>>>>>>
>>>>>>> However, the interfaces from and to DataStream API have not been
>>>>>>> touched
>>>>>>> during the introduction of these new features and are kind of
>>>>>>> outdated.
>>>>>>> The interfaces lack important functionality that is available in
>>>>>>> Table
>>>>>>> API but not exposed to DataStream API users. DataStream API is
>>>>>>> still our
>>>>>>> most important API which is why a good interoperability is crucial.
>>>>>>>
>>>>>>> This FLIP is a mixture of different topics that improve the
>>>>>>> interoperability between DataStream and Table API in terms of:
>>>>>>>
>>>>>>> - DataStream <-> Table conversion
>>>>>>> - translation of type systems TypeInformation <-> DataType
>>>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>>>> - changelog handling
>>>>>>> - row handling in DataStream API
>>>>>>>
>>>>>>> I'm looking forward to your feedback.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Jark Wu <im...@gmail.com>.
Hi Timo,

Thanks for the quick response.

5. "StreamStatementSet#attachToStream()"
Joining or using connect() with a different DataStream is a good case.
cc @Godfrey , what do you think about the `attachToStream()` API?

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> We need a Map for constant time of mapping field name to index.
But we can easily build the Map from the List<String> fieldNames in Row
constructor.
IMO, manually building the Map and mapping names to indices is verbose and
error-prone.
Are you concerned about the per-record performance?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Thanks for the explanation.
Regarding the case (b), I have the same confusion with Dawid that what's
the result when index-based setters and name-based setters are mixed used
(esp. in foreach and if branches).
TBH, I don't see a strong need for named setters. Using it as the UDAF
accumulator is not as good as POJO in terms of performance and ease of use.

Best,
Jark

On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi all,
>
> I really like the ideas of this FLIP. I think it improves user
> experience quite a bit. I wanted to add just two comments:
>
> 1. As for the StatementSet I like the approach described in the FLIP for
> its simplicity. Moreover the way I see it is that if a user wants to
> work with DataStream, then he/she wants to end up in the DataStream API,
> or in other words call the StreamExecutionEnvironment#execute.
>
> 2. @Timo What is the interaction between Row setters from the different
> modes? What happens if the user calls both in different order. E.g.
>
> row.setField(0, "ABC");
>
> row.setField("f0", "ABC"); // is this a valid call ?
>
> or
>
> row.setField("f0", "ABC");
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> or
>
> row.setFieldNames(...);
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> Best,
>
> Dawid
>
> On 01/09/2020 11:49, Timo Walther wrote:
> > Hi Jark,
> >
> > thanks for the detailed review. Let me answer your concerns:
> >
> > ## Conversion of DataStream to Table
> >
> > 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > leaf of a QueryOperation tree in the validation phase."
> > I'm fine with allowing `system_proctime` everywhere in the query. Also
> > for SQL, I think we should have done that earlier already to give
> > users the chance to have time based operations also at later stages.
> >
> > 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > assigned implicitly. "
> > Yes, we just use the DataStream API watermark. `system_rowtime()` will
> > just introduce a time attribute, the watermark travels to the Table
> > API and into DataStream API without further code changes.
> >
> > ## Conversion of Table to DataStream
> >
> > 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > DataStream<Row>"
> > 4. "Table.execute(ChangelogMode)"
> > Filtering UPDATE_BEFORE is already quite important as it reduces the
> > amount of data by factor 2. But I also understand your concerns
> > regarding confusing users. I also got the request for a
> > `Table.getChangelogMode()` a couple of times in the past, because
> > users would like to get information about the kind of query that is
> > executed. However, in this case `toChangelogStream(Table)` is
> > equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > Table)` so we don't need `Table.getChangelogMode()` in the current
> > FLIP design. But this can be future work. Let's start with
> > `toChangelogStream(Table)` and wait for more feedback about this new
> > feature. What do others think?
> >
> > ## Conversion of StatementSet to DataStream API
> >
> > 5. "StreamStatementSet#attachToStream()"
> >
> > I think Godfrey's proposal is too complex for regular users. Instead
> > of continuing with the fluent programming, we would force users to
> > define a DataStream pipeline in a lambda.
> >
> > Furthermore, joining or using connect() with a different DataStream
> > source would not be possible in this design.
> >
> > The `execute()` method of `StatementSet` should not execute the
> > DataStream API subprogram. It mixes the concepts because we tell
> > users: "If you use toDataStream" you need to use
> > `StreamExecutionEnvironment.execute()`.
> >
> > We don't solve every potential use case with the current FLIP design
> > but the most important one where a pipeline just uses an INSERT INTO
> > but also uses Table API for connectors and preprocessing and does the
> > main logic in DataStream API:
> >
> > T1 -> T2, T3 -> DataStream, T4 -> DataStream
> >
> > I would consider `StatementSet.addDataStream(Table, ...)` future work
> > for now as it is only an opimization for reusing parts of the
> > StreamGraph. We could even perform this optimization when calling
> > `toInsertStream` or `toChangelogStream`.
> >
> > ## Improve dealing with Row in DataStream API
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > We need a Map for constant time of mapping field name to index.
> >
> > We accept a nullable `fieldNames` because names are not mandatory, one
> > can also work with indices as before.
> >
> > But you are right that the fieldNames member variable can be
> > immutable. I just wanted to avoid too many overloaded constructors.
> > I'm fine with having one full constructor for RowKind, arity and field
> > names (or null).
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> > Maybe I leaked to many implementation details there that rather
> > confuse readers than help. Internally, we need to distinguish between
> > two kinds of rows. A user should not be bothered by this.
> >
> > a) Row comes from Table API runtime: hasFieldOrder = true
> > Map("myAge" -> 0, "myName" -> 1)
> >
> > row.getField("myName") == row.getField(1)
> > row.getField("myAge") == row.getField(0)
> >
> > b) Row comes from user: hasFieldOrder = false
> > Row row = new Row(2);
> > row.setField("myName", "Alice");
> > row.setField("myAge", 32);
> >
> > Map("myAge" -> 1, "myName" -> 0)
> >
> > But the type information will decide about the order of the fields
> > later and reorder them accordingly during serialization or RowData
> > conversion:
> >
> > ["myName", "myAge"] vs. ["myAge", "myName"]
> >
> > The user must not care about this as it always feels naturally to deal
> > with the rows.
> >
> > Regards,
> > Timo
> >
> >
> > On 01.09.20 06:19, Jark Wu wrote:
> >> Hi Timo,
> >>
> >> Thanks a lot for the great proposal and sorry for the late reply.
> >> This is
> >> an important improvement for DataStream and Table API users.
> >>
> >> I have listed my thoughts and questions below ;-)
> >>
> >> ## Conversion of DataStream to Table
> >>
> >> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >> leaf of
> >> a QueryOperation tree in the validation phase."
> >> IIUC, that means `system_rowtime()` can only be used in the first
> >> `select()` after `fromXxxStream()`, right?
> >> However, I think `system_proctime()` shouldn't have this limitation,
> >> because it doesn't rely on the underlying timestamp of StreamRecord and
> >>   can be generated in any stage of the query.
> >>
> >> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >> assigned implicitly. "
> >> What watermark will be used here? Is the pre-assigned watermark in the
> >> DataStream (so called `system_watermak()`)?
> >>
> >> ## Conversion of Table to DataStream
> >>
> >> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >> DataStream<Row>"
> >> I'm not sure whether this method is useful for users. Currently, the
> >> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> >> filtering UPDATE_BEFORE if possible.
> >> However, if we expose this method to users, it may be confusing.
> >> Users may
> >> try to use this method to convert a changelog stream to an insert-only
> >> stream by applying ChangelogMode.insertOnly(). This might be misleading.
> >> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> >> have
> >> to know the ChangelogMode of the current Table first, and remove
> >> UPDATE_BEFORE from the ChagnelogMode.
> >> That means we have to support `Table.getChangelogMode()` first? But
> >> `ChangelogMode` derivation requires a full optimization path on the
> >> Table,
> >> which seems impossible now.
> >> Therefore, IMHO, we can introduce this interface in the future if users
> >> indeed need this. For most users, I think `toChangelogStream(Table)` is
> >> enough.
> >>
> >> 4. "Table.execute(ChangelogMode)"
> >> Ditto.
> >>
> >> ## Conversion of StatementSet to DataStream API
> >>
> >> 5. "StreamStatementSet#attachToStream()"
> >> I think the potential drawback is that it can't support multi-sink
> >> optimization, i.e. share pipeline.
> >> For example, if we have a Table `t1` (a heavy view uses join,
> >> aggregate),
> >> and want to sink to "mysql" using SQL and want to continue processing
> >> using
> >> DataStream in a job.
> >> It's a huge waste of resources if we re-compute `t1`. It would be
> >> nice if
> >> we can come up with a solution to share the pipeline.
> >>
> >> I borrowed Godfrey's idea in FLINK-18840 and added some
> >> modifications. What
> >> do you think about the following proposal?
> >>
> >> interface StatementSet {
> >>     StatementSet addDataStream(Table table, TableDataStreamTransform
> >> transform);
> >> }
> >>
> >> interface TableDataStreamTransform {
> >>     void transform(Context);
> >>
> >>     interface Context {
> >>         Table getTable();
> >>         DataStream<Row> toInsertStream(Table);
> >>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> >>         DataStream<Row> toChangelogStream(Table);
> >>     }
> >> }
> >>
> >> tEnv
> >>    .createStatementSet()
> >>    .addInsert("mysql", table1)
> >>    .addDataStream(table1, ctx -> {
> >>        ctx.toInsertStream(ctx.getTable())
> >>          .flatmap(..)
> >>          .keyBy(..)
> >>          .process(..)
> >>          .addSink(...);
> >>    })
> >>
> >>
> >> ## Improve dealing with Row in DataStream API
> >>
> >> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
> >> enough and more handy than Map ?
> >> - Currently, the fieldNames member variable is mutable, is it on
> >> purpose?
> >> Can we make it immutable? For example, only accept from the constructor.
> >> - Why do we accept a nullable `fieldNames`?
> >>
> >> 7. "a Row has two modes represented by an internal boolean flag
> >> `hasFieldOrder`."
> >> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> >> and is
> >> used for. Could you explain a bit more for this?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
> >>
> >>> Hi David,
> >>>
> >>> thanks for your feedback. Feedback from someone who interacts with many
> >>> users is very valuable. I added an explanation for StatementSets to the
> >>> FLIP.
> >>>
> >>> Regarding watermarks and fromInsertStream, actually the
> >>>
> >>> `Schema.watermark("ts", system_watermark())`
> >>>
> >>> is not really necessary in the `fromChangelogStream`. It is added to
> >>> satify the Schema interface and be similar to SQL DDL.
> >>>
> >>> We could already extract the watermark strategy if we see
> >>> `system_rowtime()` because in most of the cases we will simply use the
> >>> DataStream API watermarks.
> >>>
> >>> But maybe some users want to generate watermarks after preprocessing in
> >>> DataStream API. In this cases users what to define a computed watermark
> >>> expression.
> >>>
> >>> So for simplicity in the Simple API we introduce:
> >>>
> >>> tEnv
> >>>     .fromInsertStream(DataStream<T>)
> >>>     .select('*, system_rowtime().as("rowtime"),
> >>> system_proctime().as("proctime"))
> >>>
> >>> and just rely on the watermarks that travel through DataStream API
> >>> already. I added another comment to the FLIP.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 19.08.20 10:53, David Anderson wrote:
> >>>> Timo, nice to see this.
> >>>>
> >>>> As someone who expects to use these interfaces, but who doesn't fully
> >>>> understand the existing Table API, I like what I see. Just a couple of
> >>>> comments:
> >>>>
> >>>> The way that watermarks fit into the fromChangelogStream case makes
> >>>> sense
> >>>> to me, and I'm wondering why watermarks don't come up in the previous
> >>>> section about fromInsertStream.
> >>>>
> >>>> I wasn't familiar with StatementSets, and I couldn't find an
> >>>> explanation
> >>> in
> >>>> the docs. I eventually found this short paragraph in an email from
> >>>> Fabian
> >>>> Hueske, which clarified everything in that section for me:
> >>>>
> >>>>       FLIP-84 [1] added the concept of a "statement set" to group
> >>>> multiple
> >>>> INSERT
> >>>>       INTO statements (SQL or Table API) together. The statements in a
> >>>> statement
> >>>>       set are jointly optimized and executed as a single Flink job.
> >>>>
> >>>> Maybe if you add this to the FLIP it will help other readers as well.
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi everyone,
> >>>>>
> >>>>> I would like to propose a FLIP that aims to resolve the remaining
> >>>>> shortcomings in the Table API:
> >>>>>
> >>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>
> >>>>>
> >>>>> The Table API has received many new features over the last year. It
> >>>>> supports a new type system (FLIP-37), connectors support changelogs
> >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>>>> support for result retrieval in an interactive fashion (FLIP-84), and
> >>>>> soon new TableDescriptors (FLIP-129).
> >>>>>
> >>>>> However, the interfaces from and to DataStream API have not been
> >>>>> touched
> >>>>> during the introduction of these new features and are kind of
> >>>>> outdated.
> >>>>> The interfaces lack important functionality that is available in
> >>>>> Table
> >>>>> API but not exposed to DataStream API users. DataStream API is
> >>>>> still our
> >>>>> most important API which is why a good interoperability is crucial.
> >>>>>
> >>>>> This FLIP is a mixture of different topics that improve the
> >>>>> interoperability between DataStream and Table API in terms of:
> >>>>>
> >>>>> - DataStream <-> Table conversion
> >>>>> - translation of type systems TypeInformation <-> DataType
> >>>>> - schema definition (incl. rowtime, watermarks, primary key)
> >>>>> - changelog handling
> >>>>> - row handling in DataStream API
> >>>>>
> >>>>> I'm looking forward to your feedback.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi all,

I really like the ideas of this FLIP. I think it improves user
experience quite a bit. I wanted to add just two comments:

1. As for the StatementSet I like the approach described in the FLIP for
its simplicity. Moreover the way I see it is that if a user wants to
work with DataStream, then he/she wants to end up in the DataStream API,
or in other words call the StreamExecutionEnvironment#execute.

2. @Timo What is the interaction between Row setters from the different
modes? What happens if the user calls both in different order. E.g.

row.setField(0, "ABC");

row.setField("f0", "ABC"); // is this a valid call ?

or

row.setField("f0", "ABC");

row.setField(0, "ABC"); // is this a valid call ?

or

row.setFieldNames(...);

row.setField(0, "ABC"); // is this a valid call ?

Best,

Dawid

On 01/09/2020 11:49, Timo Walther wrote:
> Hi Jark,
>
> thanks for the detailed review. Let me answer your concerns:
>
> ## Conversion of DataStream to Table
>
> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> leaf of a QueryOperation tree in the validation phase."
> I'm fine with allowing `system_proctime` everywhere in the query. Also
> for SQL, I think we should have done that earlier already to give
> users the chance to have time based operations also at later stages.
>
> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> assigned implicitly. "
> Yes, we just use the DataStream API watermark. `system_rowtime()` will
> just introduce a time attribute, the watermark travels to the Table
> API and into DataStream API without further code changes.
>
> ## Conversion of Table to DataStream
>
> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> DataStream<Row>"
> 4. "Table.execute(ChangelogMode)"
> Filtering UPDATE_BEFORE is already quite important as it reduces the
> amount of data by factor 2. But I also understand your concerns
> regarding confusing users. I also got the request for a
> `Table.getChangelogMode()` a couple of times in the past, because
> users would like to get information about the kind of query that is
> executed. However, in this case `toChangelogStream(Table)` is
> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> Table)` so we don't need `Table.getChangelogMode()` in the current
> FLIP design. But this can be future work. Let's start with
> `toChangelogStream(Table)` and wait for more feedback about this new
> feature. What do others think?
>
> ## Conversion of StatementSet to DataStream API
>
> 5. "StreamStatementSet#attachToStream()"
>
> I think Godfrey's proposal is too complex for regular users. Instead
> of continuing with the fluent programming, we would force users to
> define a DataStream pipeline in a lambda.
>
> Furthermore, joining or using connect() with a different DataStream
> source would not be possible in this design.
>
> The `execute()` method of `StatementSet` should not execute the
> DataStream API subprogram. It mixes the concepts because we tell
> users: "If you use toDataStream" you need to use
> `StreamExecutionEnvironment.execute()`.
>
> We don't solve every potential use case with the current FLIP design
> but the most important one where a pipeline just uses an INSERT INTO
> but also uses Table API for connectors and preprocessing and does the
> main logic in DataStream API:
>
> T1 -> T2, T3 -> DataStream, T4 -> DataStream
>
> I would consider `StatementSet.addDataStream(Table, ...)` future work
> for now as it is only an opimization for reusing parts of the
> StreamGraph. We could even perform this optimization when calling
> `toInsertStream` or `toChangelogStream`.
>
> ## Improve dealing with Row in DataStream API
>
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>
> We need a Map for constant time of mapping field name to index.
>
> We accept a nullable `fieldNames` because names are not mandatory, one
> can also work with indices as before.
>
> But you are right that the fieldNames member variable can be
> immutable. I just wanted to avoid too many overloaded constructors.
> I'm fine with having one full constructor for RowKind, arity and field
> names (or null).
>
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Maybe I leaked to many implementation details there that rather
> confuse readers than help. Internally, we need to distinguish between
> two kinds of rows. A user should not be bothered by this.
>
> a) Row comes from Table API runtime: hasFieldOrder = true
> Map("myAge" -> 0, "myName" -> 1)
>
> row.getField("myName") == row.getField(1)
> row.getField("myAge") == row.getField(0)
>
> b) Row comes from user: hasFieldOrder = false
> Row row = new Row(2);
> row.setField("myName", "Alice");
> row.setField("myAge", 32);
>
> Map("myAge" -> 1, "myName" -> 0)
>
> But the type information will decide about the order of the fields
> later and reorder them accordingly during serialization or RowData
> conversion:
>
> ["myName", "myAge"] vs. ["myAge", "myName"]
>
> The user must not care about this as it always feels naturally to deal
> with the rows.
>
> Regards,
> Timo
>
>
> On 01.09.20 06:19, Jark Wu wrote:
>> Hi Timo,
>>
>> Thanks a lot for the great proposal and sorry for the late reply.
>> This is
>> an important improvement for DataStream and Table API users.
>>
>> I have listed my thoughts and questions below ;-)
>>
>> ## Conversion of DataStream to Table
>>
>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>> leaf of
>> a QueryOperation tree in the validation phase."
>> IIUC, that means `system_rowtime()` can only be used in the first
>> `select()` after `fromXxxStream()`, right?
>> However, I think `system_proctime()` shouldn't have this limitation,
>> because it doesn't rely on the underlying timestamp of StreamRecord and
>>   can be generated in any stage of the query.
>>
>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>> assigned implicitly. "
>> What watermark will be used here? Is the pre-assigned watermark in the
>> DataStream (so called `system_watermak()`)?
>>
>> ## Conversion of Table to DataStream
>>
>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>> DataStream<Row>"
>> I'm not sure whether this method is useful for users. Currently, the
>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
>> filtering UPDATE_BEFORE if possible.
>> However, if we expose this method to users, it may be confusing.
>> Users may
>> try to use this method to convert a changelog stream to an insert-only
>> stream by applying ChangelogMode.insertOnly(). This might be misleading.
>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
>> have
>> to know the ChangelogMode of the current Table first, and remove
>> UPDATE_BEFORE from the ChagnelogMode.
>> That means we have to support `Table.getChangelogMode()` first? But
>> `ChangelogMode` derivation requires a full optimization path on the
>> Table,
>> which seems impossible now.
>> Therefore, IMHO, we can introduce this interface in the future if users
>> indeed need this. For most users, I think `toChangelogStream(Table)` is
>> enough.
>>
>> 4. "Table.execute(ChangelogMode)"
>> Ditto.
>>
>> ## Conversion of StatementSet to DataStream API
>>
>> 5. "StreamStatementSet#attachToStream()"
>> I think the potential drawback is that it can't support multi-sink
>> optimization, i.e. share pipeline.
>> For example, if we have a Table `t1` (a heavy view uses join,
>> aggregate),
>> and want to sink to "mysql" using SQL and want to continue processing
>> using
>> DataStream in a job.
>> It's a huge waste of resources if we re-compute `t1`. It would be
>> nice if
>> we can come up with a solution to share the pipeline.
>>
>> I borrowed Godfrey's idea in FLINK-18840 and added some
>> modifications. What
>> do you think about the following proposal?
>>
>> interface StatementSet {
>>     StatementSet addDataStream(Table table, TableDataStreamTransform
>> transform);
>> }
>>
>> interface TableDataStreamTransform {
>>     void transform(Context);
>>
>>     interface Context {
>>         Table getTable();
>>         DataStream<Row> toInsertStream(Table);
>>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>>         DataStream<Row> toChangelogStream(Table);
>>     }
>> }
>>
>> tEnv
>>    .createStatementSet()
>>    .addInsert("mysql", table1)
>>    .addDataStream(table1, ctx -> {
>>        ctx.toInsertStream(ctx.getTable())
>>          .flatmap(..)
>>          .keyBy(..)
>>          .process(..)
>>          .addSink(...);
>>    })
>>
>>
>> ## Improve dealing with Row in DataStream API
>>
>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
>> enough and more handy than Map ?
>> - Currently, the fieldNames member variable is mutable, is it on
>> purpose?
>> Can we make it immutable? For example, only accept from the constructor.
>> - Why do we accept a nullable `fieldNames`?
>>
>> 7. "a Row has two modes represented by an internal boolean flag
>> `hasFieldOrder`."
>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
>> and is
>> used for. Could you explain a bit more for this?
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi David,
>>>
>>> thanks for your feedback. Feedback from someone who interacts with many
>>> users is very valuable. I added an explanation for StatementSets to the
>>> FLIP.
>>>
>>> Regarding watermarks and fromInsertStream, actually the
>>>
>>> `Schema.watermark("ts", system_watermark())`
>>>
>>> is not really necessary in the `fromChangelogStream`. It is added to
>>> satify the Schema interface and be similar to SQL DDL.
>>>
>>> We could already extract the watermark strategy if we see
>>> `system_rowtime()` because in most of the cases we will simply use the
>>> DataStream API watermarks.
>>>
>>> But maybe some users want to generate watermarks after preprocessing in
>>> DataStream API. In this cases users what to define a computed watermark
>>> expression.
>>>
>>> So for simplicity in the Simple API we introduce:
>>>
>>> tEnv
>>>     .fromInsertStream(DataStream<T>)
>>>     .select('*, system_rowtime().as("rowtime"),
>>> system_proctime().as("proctime"))
>>>
>>> and just rely on the watermarks that travel through DataStream API
>>> already. I added another comment to the FLIP.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 19.08.20 10:53, David Anderson wrote:
>>>> Timo, nice to see this.
>>>>
>>>> As someone who expects to use these interfaces, but who doesn't fully
>>>> understand the existing Table API, I like what I see. Just a couple of
>>>> comments:
>>>>
>>>> The way that watermarks fit into the fromChangelogStream case makes
>>>> sense
>>>> to me, and I'm wondering why watermarks don't come up in the previous
>>>> section about fromInsertStream.
>>>>
>>>> I wasn't familiar with StatementSets, and I couldn't find an
>>>> explanation
>>> in
>>>> the docs. I eventually found this short paragraph in an email from
>>>> Fabian
>>>> Hueske, which clarified everything in that section for me:
>>>>
>>>>       FLIP-84 [1] added the concept of a "statement set" to group
>>>> multiple
>>>> INSERT
>>>>       INTO statements (SQL or Table API) together. The statements in a
>>>> statement
>>>>       set are jointly optimized and executed as a single Flink job.
>>>>
>>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>
>>>>>
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been
>>>>> touched
>>>>> during the introduction of these new features and are kind of
>>>>> outdated.
>>>>> The interfaces lack important functionality that is available in
>>>>> Table
>>>>> API but not exposed to DataStream API users. DataStream API is
>>>>> still our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>
>>>
>>>
>>
>


Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Posted by Timo Walther <tw...@apache.org>.
Hi Jark,

thanks for the detailed review. Let me answer your concerns:

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf 
of a QueryOperation tree in the validation phase."
I'm fine with allowing `system_proctime` everywhere in the query. Also 
for SQL, I think we should have done that earlier already to give users 
the chance to have time based operations also at later stages.

2. "By using `system_rowtime().as("rowtime")` the watermark would be 
assigned implicitly. "
Yes, we just use the DataStream API watermark. `system_rowtime()` will 
just introduce a time attribute, the watermark travels to the Table API 
and into DataStream API without further code changes.

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): 
DataStream<Row>"
4. "Table.execute(ChangelogMode)"
Filtering UPDATE_BEFORE is already quite important as it reduces the 
amount of data by factor 2. But I also understand your concerns 
regarding confusing users. I also got the request for a 
`Table.getChangelogMode()` a couple of times in the past, because users 
would like to get information about the kind of query that is executed. 
However, in this case `toChangelogStream(Table)` is equivalent to call 
``toChangelogStream(Table.getChangelogMode(), Table)` so we don't need 
`Table.getChangelogMode()` in the current FLIP design. But this can be 
future work. Let's start with `toChangelogStream(Table)` and wait for 
more feedback about this new feature. What do others think?

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"

I think Godfrey's proposal is too complex for regular users. Instead of 
continuing with the fluent programming, we would force users to define a 
DataStream pipeline in a lambda.

Furthermore, joining or using connect() with a different DataStream 
source would not be possible in this design.

The `execute()` method of `StatementSet` should not execute the 
DataStream API subprogram. It mixes the concepts because we tell users: 
"If you use toDataStream" you need to use 
`StreamExecutionEnvironment.execute()`.

We don't solve every potential use case with the current FLIP design but 
the most important one where a pipeline just uses an INSERT INTO but 
also uses Table API for connectors and preprocessing and does the main 
logic in DataStream API:

T1 -> T2, T3 -> DataStream, T4 -> DataStream

I would consider `StatementSet.addDataStream(Table, ...)` future work 
for now as it is only an opimization for reusing parts of the 
StreamGraph. We could even perform this optimization when calling 
`toInsertStream` or `toChangelogStream`.

## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"

We need a Map for constant time of mapping field name to index.

We accept a nullable `fieldNames` because names are not mandatory, one 
can also work with indices as before.

But you are right that the fieldNames member variable can be immutable. 
I just wanted to avoid too many overloaded constructors. I'm fine with 
having one full constructor for RowKind, arity and field names (or null).

7. "a Row has two modes represented by an internal boolean flag 
`hasFieldOrder`."
Maybe I leaked to many implementation details there that rather confuse 
readers than help. Internally, we need to distinguish between two kinds 
of rows. A user should not be bothered by this.

a) Row comes from Table API runtime: hasFieldOrder = true
Map("myAge" -> 0, "myName" -> 1)

row.getField("myName") == row.getField(1)
row.getField("myAge") == row.getField(0)

b) Row comes from user: hasFieldOrder = false
Row row = new Row(2);
row.setField("myName", "Alice");
row.setField("myAge", 32);

Map("myAge" -> 1, "myName" -> 0)

But the type information will decide about the order of the fields later 
and reorder them accordingly during serialization or RowData conversion:

["myName", "myAge"] vs. ["myAge", "myName"]

The user must not care about this as it always feels naturally to deal 
with the rows.

Regards,
Timo


On 01.09.20 06:19, Jark Wu wrote:
> Hi Timo,
> 
> Thanks a lot for the great proposal and sorry for the late reply. This is
> an important improvement for DataStream and Table API users.
> 
> I have listed my thoughts and questions below ;-)
> 
> ## Conversion of DataStream to Table
> 
> 1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
> a QueryOperation tree in the validation phase."
> IIUC, that means `system_rowtime()` can only be used in the first
> `select()` after `fromXxxStream()`, right?
> However, I think `system_proctime()` shouldn't have this limitation,
> because it doesn't rely on the underlying timestamp of StreamRecord and
>   can be generated in any stage of the query.
> 
> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> assigned implicitly. "
> What watermark will be used here? Is the pre-assigned watermark in the
> DataStream (so called `system_watermak()`)?
> 
> ## Conversion of Table to DataStream
> 
> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> DataStream<Row>"
> I'm not sure whether this method is useful for users. Currently, the
> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> filtering UPDATE_BEFORE if possible.
> However, if we expose this method to users, it may be confusing. Users may
> try to use this method to convert a changelog stream to an insert-only
> stream by applying ChangelogMode.insertOnly(). This might be misleading.
> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
> to know the ChangelogMode of the current Table first, and remove
> UPDATE_BEFORE from the ChagnelogMode.
> That means we have to support `Table.getChangelogMode()` first? But
> `ChangelogMode` derivation requires a full optimization path on the Table,
> which seems impossible now.
> Therefore, IMHO, we can introduce this interface in the future if users
> indeed need this. For most users, I think `toChangelogStream(Table)` is
> enough.
> 
> 4. "Table.execute(ChangelogMode)"
> Ditto.
> 
> ## Conversion of StatementSet to DataStream API
> 
> 5. "StreamStatementSet#attachToStream()"
> I think the potential drawback is that it can't support multi-sink
> optimization, i.e. share pipeline.
> For example, if we have a Table `t1` (a heavy view uses join, aggregate),
> and want to sink to "mysql" using SQL and want to continue processing using
> DataStream in a job.
> It's a huge waste of resources if we re-compute `t1`. It would be nice if
> we can come up with a solution to share the pipeline.
> 
> I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
> do you think about the following proposal?
> 
> interface StatementSet {
>     StatementSet addDataStream(Table table, TableDataStreamTransform
> transform);
> }
> 
> interface TableDataStreamTransform {
>     void transform(Context);
> 
>     interface Context {
>         Table getTable();
>         DataStream<Row> toInsertStream(Table);
>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>         DataStream<Row> toChangelogStream(Table);
>     }
> }
> 
> tEnv
>    .createStatementSet()
>    .addInsert("mysql", table1)
>    .addDataStream(table1, ctx -> {
>        ctx.toInsertStream(ctx.getTable())
>          .flatmap(..)
>          .keyBy(..)
>          .process(..)
>          .addSink(...);
>    })
> 
> 
> ## Improve dealing with Row in DataStream API
> 
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
> enough and more handy than Map ?
> - Currently, the fieldNames member variable is mutable, is it on purpose?
> Can we make it immutable? For example, only accept from the constructor.
> - Why do we accept a nullable `fieldNames`?
> 
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
> used for. Could you explain a bit more for this?
> 
> Best,
> Jark
> 
> 
> On Wed, 19 Aug 2020 at 17:38, Timo Walther <tw...@apache.org> wrote:
> 
>> Hi David,
>>
>> thanks for your feedback. Feedback from someone who interacts with many
>> users is very valuable. I added an explanation for StatementSets to the
>> FLIP.
>>
>> Regarding watermarks and fromInsertStream, actually the
>>
>> `Schema.watermark("ts", system_watermark())`
>>
>> is not really necessary in the `fromChangelogStream`. It is added to
>> satify the Schema interface and be similar to SQL DDL.
>>
>> We could already extract the watermark strategy if we see
>> `system_rowtime()` because in most of the cases we will simply use the
>> DataStream API watermarks.
>>
>> But maybe some users want to generate watermarks after preprocessing in
>> DataStream API. In this cases users what to define a computed watermark
>> expression.
>>
>> So for simplicity in the Simple API we introduce:
>>
>> tEnv
>>     .fromInsertStream(DataStream<T>)
>>     .select('*, system_rowtime().as("rowtime"),
>> system_proctime().as("proctime"))
>>
>> and just rely on the watermarks that travel through DataStream API
>> already. I added another comment to the FLIP.
>>
>> Regards,
>> Timo
>>
>>
>> On 19.08.20 10:53, David Anderson wrote:
>>> Timo, nice to see this.
>>>
>>> As someone who expects to use these interfaces, but who doesn't fully
>>> understand the existing Table API, I like what I see. Just a couple of
>>> comments:
>>>
>>> The way that watermarks fit into the fromChangelogStream case makes sense
>>> to me, and I'm wondering why watermarks don't come up in the previous
>>> section about fromInsertStream.
>>>
>>> I wasn't familiar with StatementSets, and I couldn't find an explanation
>> in
>>> the docs. I eventually found this short paragraph in an email from Fabian
>>> Hueske, which clarified everything in that section for me:
>>>
>>>       FLIP-84 [1] added the concept of a "statement set" to group multiple
>>> INSERT
>>>       INTO statements (SQL or Table API) together. The statements in a
>>> statement
>>>       set are jointly optimized and executed as a single Flink job.
>>>
>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>
>>> Best,
>>> David
>>>
>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <tw...@apache.org>
>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>> shortcomings in the Table API:
>>>>
>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>
>>>> The Table API has received many new features over the last year. It
>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>> soon new TableDescriptors (FLIP-129).
>>>>
>>>> However, the interfaces from and to DataStream API have not been touched
>>>> during the introduction of these new features and are kind of outdated.
>>>> The interfaces lack important functionality that is available in Table
>>>> API but not exposed to DataStream API users. DataStream API is still our
>>>> most important API which is why a good interoperability is crucial.
>>>>
>>>> This FLIP is a mixture of different topics that improve the
>>>> interoperability between DataStream and Table API in terms of:
>>>>
>>>> - DataStream <-> Table conversion
>>>> - translation of type systems TypeInformation <-> DataType
>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>> - changelog handling
>>>> - row handling in DataStream API
>>>>
>>>> I'm looking forward to your feedback.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>
>>
>>
>