You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Sean Wang <ws...@gmail.com> on 2016/11/02 03:49:05 UTC

Re: [DISCUSS] FLIP-11: Table API Stream Aggregations

Hi Stephan and Fabian,
Thanks for the replies and very happy to see that my questions have raised
a good discussion here. This is more than just FLIP-11, but the future of
flink SQL (glad all of us agree to use the standard compliant SQL
interface) and the roll of Table API.
The syntax of TableAPI is just one's favour, but IMO, the tradeoff between
SQL syntax and TableAPI flexibility should be well considered during the
design. I hope to see a powerful, concise, and compatible TableAPI.
I agree with your other comments on Flip11.

Regards,
Shaoxuan


On Fri, Oct 28, 2016 at 11:09 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Thanks for bringing up this point Stephan.
>
> I think it is a good decision to have a standard compliant SQL interface
> that SQL developers and tools can use. The SQL proposal [1] I posted a few
> days ago follows this approach.
>
> However, SQL was not designed with stream processing in mind and many
> important streaming concepts are either cumbersome to express or not at all
> expressible with plain SQL (see Tyler Akidau's [2] proposal for instance).
> I think the Table API has the chance to improve a lot of these issues by
> not following the SQL standard too closely.
>
> +1 for defining window aggregates with the explicit the window() and
> rowWindow() clauses.
>
> +1 for sticking to groupBy() instead of partitionBy().
>
> There are a few more ideas that can make the Table API better suited for
> streaming such as:
>
> - have built-in emission and refinement strategies (see StreamSQL doc [1])
> - easier joins against time-variant tables (Temporal table proposal of
> Julian Hyde [3])
> - support for DataStream-like UDFs
>
> These issues should be discussed separately though.
>
> Best,
> Fabian
>
> [1]
> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ
> PW4tnl8THw6rzGUdaqU
> [2]
> https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU
> 6BERXXDZ3t0HzSLij9Q
> [3]
> https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szp
> kbGqFMBtzYiIY4dHe0Q
>
> 2016-10-28 10:05 GMT+02:00 Jark Wu <wu...@alibaba-inc.com>:
>
> > Hi ,
> >
> > I agree that Table API should be a SQL-like API, that do not need to be
> > strictly consistent with SQL.
> >
> > > - Not put the window definition into the groupBy clause.
> >
> > +1  Putting the window clause out of groupBy, will be easy to learn and
> > easy to discover for users.
> >
> > > I like the idea of having separate ".window()" and ".rowWindow()"
> > > clauses.
> >
> > +1
> >
> > >  I would prefer to not have a "partitionBy" statement.
> >
> > As far as I know, the partitionBy is playing the same role with groupBy
> in
> > Table API. So maybe groupBy is enough, there is no need to introduce a
> new
> > clause.
> >
> >
> > - Jark Wu
> >
> > > 在 2016年10月27日,上午2:22,Stephan Ewen <se...@apache.org> 写道:
> > >
> > > Hi all!
> > >
> > > I think that in order to get a better hold on how we what to build the
> > > Table API, we need to *decide what the role of the Table API should
> be*.
> > We
> > > touched on that a few times, but I think we still have different ideas
> > > about that.
> > >
> > > To get there, let me take back a step and look at the design of Stream
> > SQL
> > > again. There were basically two competing approaches:
> > >
> > > (1) Keep SQL as it is and make it run on infinite streams via
> introducing
> > > dynamic tables
> > > (2) Do a new language that is similar to SQL, but designed with
> streaming
> > > concepts in mind (first class support for time and windows)
> > >
> > > Both approaches had good points. The Stream SQL design doc posted
> > followed
> > > approach (1) - keep SQL as it is.
> > >
> > >
> > >
> > > Now, for the Table API, we seem to be having a similar discussion
> again.
> > >
> > > (1) Let the Table API be as similar to SQL as possible, simply make it
> > feel
> > > "fluently embedded" in Scala.
> > > (2) Define the Table API as one would define a new and clean DSL for
> > > streaming. SQL inspired, of course. Where SQL syntax feels natural, use
> > the
> > > SQL syntax, but make it very accessible to Java/Scala (non-SQL)
> > programmers.
> > >
> > >
> > > I am personally more in favor of variant (2) for the following reasons:
> > >
> > >  - We already have SQL compliant with the standards and tools.
> > >
> > >  - Mirroring SQL too closely into the Table API has the marginal
> benefit
> > > that someone close to SQL will find it a bit more familiar. Not sure if
> > > that is even the case, as they have to re-learn the fluent DSL and
> Scala
> > > concepts
> > >
> > >  - We are making it more difficult for all those that come from a more
> > > Scala/Java DataStream background and simply want to move "a layer up",
> > > getting schema and more optimizations into the equation.
> > >
> > >
> > >
> > >
> > > What would that mean for the specific issues that are discussed in
> > FLIP-11?
> > > Based on interpreting the Table API as re-imaged streaming DSL, I would
> > > suggest to
> > >
> > >  - Not put the window definition into the groupBy clause. It just is
> > > unexpected for all that are not super familiar with SQL and hard to
> > > discover in the IDE. A separate window clause is simpler for users
> coming
> > > from the DataStream background (or other streaming APIs) and it is more
> > > discoverable in the IDE.
> > >
> > >  - I like the idea of having separate ".window()" and ".rowWindow()"
> > > clauses. Makes it more explicit that very different things will happen.
> > >
> > >  - I would prefer to not have a "partitionBy" statement. When we
> restrain
> > > the Table API at least initially to having one partitioning for the
> > > windows, we should be able to express the partitioning by simply adding
> > it
> > > to the fields in the "groupBy" clause. That would make the API easier
> > > accessible to users that not SQL powerusers.
> > >
> > >
> > > What do others think?
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >
> > >> Thanks for your reply Shaoxuan!
> > >>
> > >> Please see my replies below.
> > >>
> > >> Best, Fabian
> > >>
> > >> 2016-10-14 11:34 GMT+02:00 Sean Wang <ws...@gmail.com>:
> > >>
> > >>> Thanks for your quick reply, Fabian.
> > >>>
> > >>> I have a few minor comments&suggestions:
> > >>>
> > >>> <GroupBy with window>
> > >>> - Agree that we should consider GroupBy without window after the new
> > SQL
> > >>> proposal is settled down.
> > >>>
> > >>>
> > >> OK, so we keep this as it is for now. GroupBy without windows will be
> > >> supported later when we are able to "guard" the memory requirements of
> > that
> > >> operation.
> > >>
> > >>
> > >>> <GroupBy with window>
> > >>> - For Java API, we can keep window() call, and put window alias into
> > >>> Groupby clause. This can be also applied to rowwindow case.
> > >>>
> > >>>
> > >> Referring to the window alias in the groupBy clause would require to
> > invert
> > >> the methods, i.e., groupBy().window() -> window().groupBy(). I am not
> > sure
> > >> if that is more intuitive. Also, Scala and Java are using the same
> class
> > >> (Table) but different methods (Java uses String parameter, Scala
> > Expression
> > >> parameters). In my opinion it makes sense to have both APIs closely
> > synced.
> > >> So I would either keep window() (after groupBy) for Scala and Java or
> > >> remove it for both.
> > >>
> > >>
> > >>> <RowWindows> & <partitionby() for rowwindow>
> > >>> -+1 to support replace groupby() by partitionby(). BTW, in the case
> of
> > >>> over, instead of partitionby, are we going to support orderby? If
> yes,
> > I
> > >>> would suggest to define rowwindow as  rowwindow(PartionByParaType,
> > >> OrderBy
> > >>> ParaType, WindowParaType).
> > >>>
> > >>>
> > >> The current FLIP-11 proposal supports defining both partitionBy and
> > orderBy
> > >> (with a few restrictions).
> > >> PartitionBy is defined for all windows alike by calling
> > >>
> > >> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
> > >> w2).select(count() over w1).
> > >>
> > >> Allowing windows with different partitioning would mean that data is
> > >> shuffled to different nodes and that we need a distributed join to
> > assemble
> > >> the result rows. In principle this could be done but would be very
> > >> expensive to execute (applies to batch and streaming). In my opinion,
> we
> > >> should not support this case.
> > >>
> > >> OrderBy is implicitly supported by the on() clause of RowWindows,
> e.g.,
> > >>
> > >> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
> > >>
> > >> says that the window is ordered on the rowtime, i.e., event-time,
> > >> attribute. For streaming we can only allow event-time order (or
> > >> processing-time order which is always given). Orders on other
> attributes
> > >> would not be possible (for infinite dynamic input tables) or very
> > expensive
> > >> (memory and computation wise) to maintain (for finite dynamic input
> > >> tables). For queries on batch tables, in principle all orders are
> > possible.
> > >> With the current proposal only count windows are supported for
> arbitrary
> > >> attribute types and time windows for timestamp attributes.
> > >>
> > >> So
> > >>> - moving windows into the groupBy() call :   +1
> > >>>
> > >>
> > >> +1
> > >>
> > >>
> > >>> - making over() for rowWindow() with a single window definition.
> > >>>
> > >>
> > >> +1
> > >>
> > >>
> > >>> - additionally allowing window definitions in over():  +1 yes for
> > scala,
> > >>> but use alias for java API.
> > >>>
> > >>
> > >> If we have the parser code for Java group windows in groupBy() it
> > should be
> > >> easy to adapt this for over(). But we should also keep the rowWindow
> > method
> > >> to define aliases.
> > >>
> > >>
> > >>> - using partitionBy() instead of groupBy() for row windows?: +1, but
> > >>> better to consider orderby, it may be even better to move
> partitionBy()
> > >>> into rowwindow.
> > >>>
> > >>>
> > >> +1 to change groupBy() to partitionBy().
> > >> I would not move partitionBy() into the RowWindow definition but keep
> it
> > >> outside to ensure only one partitioning is defined. The orderBy
> > definition
> > >> is already fluently included in the RowWindow via the on() method.
> > >>
> > >>
> > >>
> > >>> Regards,
> > >>> Shaoxuan
> > >>>
> > >>>
> > >>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fh...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hi everybody,
> > >>>>
> > >>>> happy to see a good discussion here :-)
> > >>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong
> > question
> > >>>> in a separate mail.
> > >>>>
> > >>>> Shaoxuan, thanks for the suggestions! I think we all agree that for
> > SQL
> > >>>> we should definitely follow the standard (batch) SQL syntax.
> > >>>> In my opinion, the Table API does not necessarily have to be as
> close
> > as
> > >>>> possible to SQL but should try to make a few things easier and also
> > >> safer
> > >>>> (easier is of course subjective).
> > >>>>
> > >>>> - GroupBy without windows: These are currently intentionally not
> > >>>> supported and also not part of FLIP-11. Our motivation for not
> > >> supporting
> > >>>> this, is to guard the user from defining a query that fails when
> being
> > >>>> executed due to a very memory consuming operation. FLIP-11 provides
> a
> > >> way
> > >>>> to define such a query as a sliding row window with unbounded
> > preceding
> > >>>> rows. With the upcoming SQL proposal, queries that consume unbounded
> > >> memory
> > >>>> should be identified and rejected. I would be in favor of allowing
> > >> groupBy
> > >>>> without windows once the guarding mechanism are in place.
> > >>>>
> > >>>> - GroupBy with window: I think this is a question of taste. Having a
> > >>>> window() call, makes the feature more explicit in my opinion.
> However,
> > >> I'm
> > >>>> not opposed to move the windows into the groupBy clause.
> > >>>> Implementation-wise it should be easy to move the window definition
> > >> into to
> > >>>> groupBy clause for the Scala Table API. For the Java Table API we
> > would
> > >>>> need to extend the parser quite a bit because windows would need to
> be
> > >>>> defined as Strings and not via objects.
> > >>>>
> > >>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW
> > clause
> > >>>> (implemented by PostgreSQL and Calcite) which allows to have
> > "reusable"
> > >>>> window definitions. I think this is a desirable feature. In the
> > FLIP-11
> > >>>> proposal the over() clause in select() refers to the predefined
> > windows
> > >>>> with aliases. In case only one window is defined, the over() clause
> is
> > >>>> optional and the same (and only) window is applied to all
> aggregates.
> > I
> > >>>> think we can make the over() call mandatory to have the windowing
> more
> > >>>> explicit. It should also be possible to extend the over clause to
> > >> directly
> > >>>> accept RowWindows instead of window aliases. I would not make this a
> > >>>> priority at the moment, but a feature that could be later added,
> > because
> > >>>> rowWindow() and over() cover all cases. Similar as for GroupBy with
> > >>>> windows, we would need to extend the parser for the Java Table API
> > >> though.
> > >>>>
> > >>>> Finally, I have an own suggestion:
> > >>>> In FLIP-11, groupBy() is  used to define the partitioning of
> > RowWindows.
> > >>>> I think this should be changed to partitionBy() because groupBy()
> > groups
> > >>>> data and applies an aggregation to all rows of a group which is not
> > >>>> happening here. In original SQL, the OVER clause features a
> PARTITION
> > BY
> > >>>> clause. We are moving this out of the window definition, i.e., OVER
> > >> clause,
> > >>>> to enforce the same partitioning for all windows (different
> > >> partitionings
> > >>>> would be a challenge to execute in a parallel system).
> > >>>>
> > >>>> @Timo and all: What do you think about:
> > >>>>
> > >>>> - moving windows into the groupBy() call
> > >>>> - making over() for rowWindow() with a single window definition
> > >>>> - additionally allowing window definitions in over()
> > >>>> - using partitionBy() instead of groupBy() for row windows?
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zh...@huawei.com>:
> > >>>>
> > >>>>> Hi shaoxuan:
> > >>>>>
> > >>>>> I think,  the streamsql must be excuted in table environment. So I
> > call
> > >>>>> this table API ‘s StreamSQL. What do you call for this, stream
> Table
> > >> API or
> > >>>>> streamsql? It is fu
> > >>>>>
> > >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >>>>> val tblEnv = TableEnvironment.getTableEnvironment(env)
> > >>>>> val ds: DataStream[(String,Long, Long)] =
> > >> env.readTextFile("/home/demo")
> > >>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
> > >>>>>    .map(f=>(f, 1L, 1L))
> > >>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
> > >>>>>
> > >>>>> So in my opinion, the grammar which is marked red should be
> > compatible
> > >>>>> with calcite's StreamSQL grammar.
> > >>>>>
> > >>>>> By the way,  thanks very much for telling me the modified content
> in
> > >>>>> Flink StreamSQL. I will look the new proposal .
> > >>>>>
> > >>>>> Thanks!
> > >>>>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
> > >>>>> 发送时间: 2016年10月13日 16:29
> > >>>>> 收件人: dev@flink.apache.org; Zhangrucong
> > >>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > >>>>>
> > >>>>> Hi  zhangrucong,
> > >>>>> I am not sure what you mean by "table API'S StreamSQL", I guess you
> > >> mean
> > >>>>> "stream TableAPI"?
> > >>>>> TableAPI should be compatible with calcite SQL. (By compatible, My
> > >>>>> understanding is that both TableAPI and SQL will be translated to
> the
> > >> same
> > >>>>> logical plan - the same set of REL and REX).
> > >>>>> BTW, please note that we recently have merged a change to remove
> > STREAM
> > >>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and
> > >> stream
> > >>>>> are not necessarily to be differentiated at the SQL level. The
> major
> > >>>>> difference between batch and stream is "WHEN and HOW to emit the
> > >> result".
> > >>>>> We have been working on a new proposal with Fabian on this change.
> I
> > >>>>> guess it will be sent out for review very soon.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Shaoxuan
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <
> zhangrucong@huawei.com
> > >>>>> <ma...@huawei.com>> wrote:
> > >>>>> Hi shaoxuan:
> > >>>>> Does the table API'S StreamSQL grammar is compatible with calcite's
> > >>>>> StreamSQL grammar?
> > >>>>>
> > >>>>>
> > >>>>> 1、In calcite, the tumble window is realized by using function
> tumble
> > or
> > >>>>> hop. And the function must be used with group by, like this:
> > >>>>>
> > >>>>> SELECT
> > >>>>>  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
> > >>>>>  productId,
> > >>>>>  COUNT(*) AS c,
> > >>>>>  SUM(units) AS units
> > >>>>> FROM Orders
> > >>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
> > >>>>>  productId;
> > >>>>>
> > >>>>> 2、 The sliding window uses keywords "window" and "over". Like this:
> > >>>>>
> > >>>>> SELECT  *
> > >>>>> FROM (
> > >>>>>  SELECT STREAM rowtime,
> > >>>>>    productId,
> > >>>>>    units,
> > >>>>>    AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING)
> AS
> > >>>>> m10,
> > >>>>>    AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
> > >>>>>  FROM Orders
> > >>>>>  WINDOW product AS (
> > >>>>>    ORDER BY rowtime
> > >>>>>    PARTITION BY productId))
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Thanks!
> > >>>>>
> > >>>>> -----邮件原件-----
> > >>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:
> > >>>>> shaoxuan.wsx@alibaba-inc.com>]
> > >>>>> 发送时间: 2016年10月13日 2:03
> > >>>>> 收件人: dev@flink.apache.org<ma...@flink.apache.org>
> > >>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > >>>>>
> > >>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This
> is a
> > >>>>> really great and promising proposal. I have a few comments to the
> > >> "window"
> > >>>>> operator proposed in this FLIP (I am hoping it is not too late to
> > >> bring up
> > >>>>> this). First, window is not always needed for the stream
> aggregation.
> > >> There
> > >>>>> are cases where we want do an aggreation on a stream, while the
> > >> query/emit
> > >>>>> strategy decides when to emit a streaming output. Second, window is
> > >> needed
> > >>>>> when we want do an aggregation for a certain rage, but window is
> not
> > an
> > >>>>> operator. We basically use window to define the range for
> > aggregation.
> > >> In
> > >>>>> tableAPI, a window should be defined together with "groupby" and
> > >> "select"
> > >>>>> operators, either inside a "groupby" operator or after an "over"
> > >> clause in
> > >>>>> "select" operator. This will make the TableAPI in the similar
> manner
> > >> as SQL.
> > >>>>> For instance,[A groupby without window]
> > >>>>> <Table API>
> > >>>>> val res = tab
> > >>>>> .groupBy(‘a)
> > >>>>> .select(‘a, ‘b.sum)
> > >>>>> <SQL>
> > >>>>> SELECT a, SUM(b)
> > >>>>> FROM tab
> > >>>>> GROUP BY a
> > >>>>> [A tumble window inside groupby]
> > >>>>> <Table API>val res = tab
> > >>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
> > >>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
> > >> ‘rowtime) [A
> > >>>>> row tumble window after OVER] <Table API>.groupby('a) //optional
> > >>>>> .select(‘a, ‘b.count over rowTumble(10.minutes,
> ‘rowtime))<SQL>SELECT
> > >> a,
> > >>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
> > >> Please let
> > >>>>> me know what you think.
> > >>>>> Regards,Shaoxuan
> > >>>>> ------------------------------------------------------------
> > >> ------发件人:Fabian
> > >>>>> Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com
> >>发送时间:2016年9月26日
> > >> (星期一)
> > >>>>> 21:13收件人:dev@flink.apache.org<ma...@flink.apache.org> <
> > >>>>> dev@flink.apache.org<ma...@flink.apache.org>>主 题:Re:
> [DISCUSS]
> > >>>>> FLIP-11: Table API Stream Aggregations Hi everybody,
> > >>>>>
> > >>>>> Timo proposed our FLIP-11 a bit more than three weeks ago.
> > >>>>> I will update the status of the FLIP to accepted.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Fabian
> > >>>>>
> > >>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org<mailto:
> > twa
> > >>>>> lthr@apache.org>>:
> > >>>>>
> > >>>>>> Hi Jark,
> > >>>>>>
> > >>>>>> yes I think enough time has passed. We can start implementing the
> > >>>>> changes.
> > >>>>>> What do you think Fabian?
> > >>>>>>
> > >>>>>> If there are no objections, I will create the subtasks in Jira
> > today.
> > >>>>>> For
> > >>>>>> FLIP-11/1 I already have implemented a prototype, I just have to
> do
> > >>>>>> some
> > >>>>>> refactoring/documentation before opening a PR.
> > >>>>>>
> > >>>>>> Timo
> > >>>>>>
> > >>>>>>
> > >>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu:
> > >>>>>>
> > >>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> It seems that there’s no objections to the window design. So
> could
> > >> we
> > >>>>>>> open subtasks to start working on it now ?
> > >>>>>>>
> > >>>>>>> - Jark Wu
> > >>>>>>>
> > >>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
> > >>>>> wuchong.wc@alibaba-inc.com>> 写道:
> > >>>>>>>>
> > >>>>>>>> Hi Fabian,
> > >>>>>>>>
> > >>>>>>>> Thanks for sharing your ideas.
> > >>>>>>>>
> > >>>>>>>> They all make sense to me. Regarding to reassigning timestamp, I
> > do
> > >>>>>>>> not
> > >>>>>>>> have an use case. I come up with this because DataStream has a
> > >>>>>>>> TimestampAssigner :)
> > >>>>>>>>
> > >>>>>>>> +1 for this FLIP.
> > >>>>>>>>
> > >>>>>>>> - Jark Wu
> > >>>>>>>>
> > >>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:fhue
> > >>>>> ske@gmail.com> <mailto:
> > >>>>>
> > >>>>>>>>> fhueske@gmail.com<ma...@gmail.com>>> 写道:
> > >>>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> thanks for your comments and questions!
> > >>>>>>>>> Actually, you are bringing up the points that Timo and I
> > discussed
> > >>>>>>>>> the
> > >>>>>>>>> most
> > >>>>>>>>> when designing the FLIP ;-)
> > >>>>>>>>>
> > >>>>>>>>> - We also thought about the syntactic shortcut for running
> > >>>>>>>>> aggregates
> > >>>>>>>>> like
> > >>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to
> not
> > >>>>>>>>> allow
> > >>>>>>>>> this shortcut is to prevent users from accidentally performing
> a
> > >>>>>>>>> "dangerous" operation. The problem with unbounded sliding
> > >>>>>>>>> row-windows is
> > >>>>>>>>> that their state does never expire. If you have an evolving key
> > >>>>>>>>> space,
> > >>>>>>>>> you
> > >>>>>>>>> will likely run into problems at some point because the
> operator
> > >>>>>>>>> state
> > >>>>>>>>> grows too large. IMO, a row-window session is a better
> approach,
> > >>>>>>>>> because it
> > >>>>>>>>> defines a timeout after which state can be discarded.
> > >>>>>>>>> groupBy.select is
> > >>>>>>>>> a
> > >>>>>>>>> very common operation in batch but its semantics in streaming
> are
> > >>>>>>>>> very
> > >>>>>>>>> different. In my opinion it makes sense to make users aware of
> > >>>>>>>>> these
> > >>>>>>>>> differences through the API.
> > >>>>>>>>>
> > >>>>>>>>> - Reassigning timestamps and watermarks is a very delicate
> issue.
> > >>>>>>>>> You
> > >>>>>>>>> are
> > >>>>>>>>> right, that Calcite exposes this field which is necessary due
> to
> > >>>>>>>>> the
> > >>>>>>>>> semantics of SQL. However, also in Calcite you cannot freely
> > >> choose
> > >>>>>>>>> the
> > >>>>>>>>> timestamp attribute for streaming queries (it must be a
> monotone
> > >> or
> > >>>>>>>>> quasi-monotone attribute) which is hard to reason about (and
> > >>>>>>>>> guarantee)
> > >>>>>>>>> after a few operators have been applied. Streaming tables in
> > Flink
> > >>>>>>>>> will
> > >>>>>>>>> likely have a time attribute which is identical to the initial
> > >>>>> rowtime.
> > >>>>>>>>> However, Flink does modify timestamps internally, e.g., for
> > >> records
> > >>>>>>>>> that
> > >>>>>>>>> are emitted from time windows, in order to ensure that
> > consecutive
> > >>>>>>>>> windows
> > >>>>>>>>> perform as expected. Modify or reassign timestamps in the
> middle
> > >> of
> > >>>>>>>>> a
> > >>>>>>>>> job
> > >>>>>>>>> can result in unexpected results which are very hard to reason
> > >>>>>>>>> about. Do
> > >>>>>>>>> you have a concrete use case in mind for reassigning
> timestamps?
> > >>>>>>>>>
> > >>>>>>>>> - The idea to represent rowtime and systime as object is good.
> > Our
> > >>>>>>>>> motivation to go for reserved Scala symbols was to have a
> uniform
> > >>>>>>>>> syntax
> > >>>>>>>>> with windows over streaming and batch tables. On batch tables
> you
> > >>>>>>>>> can
> > >>>>>>>>> compute time windows basically over every time attribute (they
> > are
> > >>>>>>>>> treated
> > >>>>>>>>> similar to grouping attributes with a bit of extra logic to
> > >> extract
> > >>>>>>>>> the
> > >>>>>>>>> grouping key for sliding and session windows). If you write
> > >>>>>>>>> window(Tumble
> > >>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime
> would
> > >>>>>>>>> indicate
> > >>>>>>>>> event-time. On a batch table with a 'rowtime attribute, the
> same
> > >>>>>>>>> operator
> > >>>>>>>>> would be internally converted into a group by. By going for the
> > >>>>>>>>> object
> > >>>>>>>>> approach we would lose this compatibility (or would need to
> > >>>>>>>>> introduce an
> > >>>>>>>>> additional column attribute to specifiy the window attribute
> for
> > >>>>>>>>> batch
> > >>>>>>>>> tables).
> > >>>>>>>>>
> > >>>>>>>>> As usual some of the design decisions are based on preferences.
> > >>>>>>>>> Do they make sense to you? Let me know what you think.
> > >>>>>>>>>
> > >>>>>>>>> Best, Fabian
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com
> <ma
> > >>>>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
> > >>>>>
> > >>>>>>>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com
> >>>:
> > >>>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>>
> > >>>>>>>>>> I'm on vacation for about five days , sorry to have missed
> this
> > >>>>>>>>>> great
> > >>>>>>>>>> FLIP.
> > >>>>>>>>>>
> > >>>>>>>>>> Yes, the non-windowed aggregates is a special case of
> > row-window.
> > >>>>>>>>>> And
> > >>>>>>>>>> the
> > >>>>>>>>>> proposal looks really good.  Can we have a simplified form for
> > >> the
> > >>>>>>>>>> special
> > >>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
> > >>>>>>>>>> ideRows.unboundedPreceding).select(…)
> > >>>>>>>>>> can be simplified to  table.groupBy(‘a).select(…). The latter
> > >> will
> > >>>>>>>>>> actually
> > >>>>>>>>>> call the former.
> > >>>>>>>>>>
> > >>>>>>>>>> Another question is about the rowtime. As the FLIP said,
> > >>>>>>>>>> DataStream and
> > >>>>>>>>>> StreamTableSource is responsible to assign timestamps and
> > >>>>>>>>>> watermarks,
> > >>>>>>>>>> furthermore “rowtime” and “systemtime” are not real column.
> IMO,
> > >>>>>>>>>> it is
> > >>>>>>>>>> different with Calcite’s rowtime, which is a real column in
> the
> > >>>>> table.
> > >>>>>>>>>> In
> > >>>>>>>>>> FLIP's way, we will lose some flexibility. Because the
> timestamp
> > >>>>>>>>>> column may
> > >>>>>>>>>> be created after some transformations or join operation, not
> > >>>>>>>>>> created at
> > >>>>>>>>>> beginning. So why do we have to define rowtime at beginning?
> > >>>>>>>>>> (because
> > >>>>>>>>>> of
> > >>>>>>>>>> watermark?)     Can we have a way to define rowtime after
> source
> > >>>>>>>>>> table
> > >>>>>>>>>> like
> > >>>>>>>>>> TimestampAssinger?
> > >>>>>>>>>>
> > >>>>>>>>>> Regarding to “allowLateness” method. I come up a trick that we
> > >> can
> > >>>>>>>>>> make
> > >>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol
> > >> expression.
> > >>>>>>>>>> The API
> > >>>>>>>>>> will looks like this :
> > >>>>>>>>>>
> > >>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
> > >>>>>>>>>>
> > >>>>>>>>>> The implementation will look like this:
> > >>>>>>>>>>
> > >>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > >>>>>>>>>>  def on(time: rowtime.type): TumblingEventTimeWindow =
> > >>>>>>>>>>      new TumblingEventTimeWindow(alias, ‘rowtime, size)
> > >> //
> > >>>>>>>>>> has
> > >>>>>>>>>> allowLateness() method
> > >>>>>>>>>>
> > >>>>>>>>>>  def on(time: systemtime.type): TumblingProcessingTimeWindow=
> > >>>>>>>>>>     new TumblingProcessingTimeWindow(alias, ‘systemtime,
> size)
> > >>>>>>>>>> // hasn’t allowLateness() method
> > >>>>>>>>>> }
> > >>>>>>>>>> object rowtime
> > >>>>>>>>>> object systemtime
> > >>>>>>>>>>
> > >>>>>>>>>> What do you think about this?
> > >>>>>>>>>>
> > >>>>>>>>>> - Jark Wu
> > >>>>>>>>>>
> > >>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org<mailto:
> twa
> > >>>>> lthr@apache.org> <mailto:
> > >>>>>
> > >>>>>>>>>>> twalthr@apache.org<ma...@apache.org>>> 写道:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi all,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I thought about the API of the FLIP again. If we allow the
> > >>>>>>>>>>> "systemtime"
> > >>>>>>>>>>>
> > >>>>>>>>>> attribute, we cannot implement a nice method chaining where
> the
> > >>>>>>>>>> user
> > >>>>>>>>>> can
> > >>>>>>>>>> define a "allowLateness" only on event time. So even if the
> user
> > >>>>>>>>>> expressed
> > >>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness"
> > >>>>>>>>>> method
> > >>>>>>>>>> because
> > >>>>>>>>>> we have to assume that this attribute can also be the batch
> > event
> > >>>>>>>>>> time
> > >>>>>>>>>> column, which is not very nice.
> > >>>>>>>>>>
> > >>>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > >>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
> > >>>>>>>>>>>   new TumblingEventTimeWindow(alias, timeField, size) // has
> > >>>>>>>>>>>
> > >>>>>>>>>> allowLateness() method
> > >>>>>>>>>>
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do you think?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Timo
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Jark,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> you had asked for non-windowed aggregates in the Table API a
> > >> few
> > >>>>>>>>>>>> times.
> > >>>>>>>>>>>> FLIP-11 proposes row-window aggregates which are a
> > >>>>>>>>>>>> generalization of
> > >>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Can you have a look at the FLIP and give feedback whether
> this
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>> what
> > >>>>>>>>>>>>
> > >>>>>>>>>>> you
> > >>>>>>>>>>
> > >>>>>>>>>>> are looking for?
> > >>>>>>>>>>>> Improvement suggestions are very welcome as well.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thank you,
> > >>>>>>>>>>>> Fabian
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo
> > Walther
> > >>>>> <tw...@apache.org> <mailto:
> > >>>>>>>>>>>> twalthr@apache.org<ma...@apache.org>>>:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi all!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in
> the
> > >>>>>>>>>>>>> Table
> > >>>>>>>>>>>>> API.
> > >>>>>>>>>>>>> You can find the FLIP-11 here:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%
> <h
> > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%
> <h
> > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
> > >>>>>
> > >>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Motivation for the FLIP:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The Table API is a declarative API to define queries on
> > static
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>> streaming tables. So far, only projection, selection, and
> > >> union
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>> supported operations on streaming tables.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> This FLIP proposes to add support for different types of
> > >>>>>>>>>>>>> aggregations
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> on
> > >>>>>>>>>>
> > >>>>>>>>>>> top of streaming tables. In particular, we seek to support:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates which are
> > computed
> > >>>>>>>>>>>>> for a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> group
> > >>>>>>>>>>
> > >>>>>>>>>>> of elements. A (time or row-count) window is required to
> bound
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> infinite
> > >>>>>>>>>>
> > >>>>>>>>>>> input stream into a finite group.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates which are
> computed
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>> each
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> row,
> > >>>>>>>>>>
> > >>>>>>>>>>> based on a window (range) of preceding and succeeding rows.
> > >>>>>>>>>>>>> Each type of aggregate shall be supported on keyed/grouped
> or
> > >>>>>>>>>>>>> non-keyed/grouped data streams for streaming tables as well
> > as
> > >>>>>>>>>>>>> batch
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> tables.
> > >>>>>>>>>>
> > >>>>>>>>>>> We are looking forward to your feedback.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Timo
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Freundliche Grüße / Kind Regards
> > >>>>>>>>>>>
> > >>>>>>>>>>> Timo Walther
> > >>>>>>>>>>>
> > >>>>>>>>>>> Follow me: @twalthr
> > >>>>>>>>>>> https://www.linkedin.com/in/twalthr
> > >>>>>>>>>>> <https://www.linkedin.com/in/t
> > >>>>>>>>>>> walthr>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Freundliche Grüße / Kind Regards
> > >>>>>>
> > >>>>>> Timo Walther
> > >>>>>>
> > >>>>>> Follow me: @twalthr
> > >>>>>> https://www.linkedin.com/in/twalthr
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Re: [DISCUSS] FLIP-11: Table API Stream Aggregations

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks everybody for the input.

I updated the FLIP-11 document [1] and did the following changes:

- made over() mandatory also for single row windows
- removed allowedLateness() from windows which would mean to add some kind
of retraction output mode. This is out-of-scope for FLIP-11 and is
addressed in the Stream SQL design doc [2].
- removed the 'systemtime keyword to distinguish event-time and
processing-time on a syntactical level (proposed by Timo in PR #2562 [3]).

I did *not* do the following changes that were discussed in this thread:

- change groupBy() to partitionBy() for row windows
- allow window specification in groupBy()
- allow row window specification in over()
- allow groupBy() without group window on streaming tables

Best,
Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations
[2]
https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU
[3] https://github.com/apache/flink/pull/2562

2016-11-02 4:49 GMT+01:00 Sean Wang <ws...@gmail.com>:

> Hi Stephan and Fabian,
> Thanks for the replies and very happy to see that my questions have raised
> a good discussion here. This is more than just FLIP-11, but the future of
> flink SQL (glad all of us agree to use the standard compliant SQL
> interface) and the roll of Table API.
> The syntax of TableAPI is just one's favour, but IMO, the tradeoff between
> SQL syntax and TableAPI flexibility should be well considered during the
> design. I hope to see a powerful, concise, and compatible TableAPI.
> I agree with your other comments on Flip11.
>
> Regards,
> Shaoxuan
>
>
> On Fri, Oct 28, 2016 at 11:09 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Thanks for bringing up this point Stephan.
> >
> > I think it is a good decision to have a standard compliant SQL interface
> > that SQL developers and tools can use. The SQL proposal [1] I posted a
> few
> > days ago follows this approach.
> >
> > However, SQL was not designed with stream processing in mind and many
> > important streaming concepts are either cumbersome to express or not at
> all
> > expressible with plain SQL (see Tyler Akidau's [2] proposal for
> instance).
> > I think the Table API has the chance to improve a lot of these issues by
> > not following the SQL standard too closely.
> >
> > +1 for defining window aggregates with the explicit the window() and
> > rowWindow() clauses.
> >
> > +1 for sticking to groupBy() instead of partitionBy().
> >
> > There are a few more ideas that can make the Table API better suited for
> > streaming such as:
> >
> > - have built-in emission and refinement strategies (see StreamSQL doc
> [1])
> > - easier joins against time-variant tables (Temporal table proposal of
> > Julian Hyde [3])
> > - support for DataStream-like UDFs
> >
> > These issues should be discussed separately though.
> >
> > Best,
> > Fabian
> >
> > [1]
> > https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ
> > PW4tnl8THw6rzGUdaqU
> > [2]
> > https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU
> > 6BERXXDZ3t0HzSLij9Q
> > [3]
> > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szp
> > kbGqFMBtzYiIY4dHe0Q
> >
> > 2016-10-28 10:05 GMT+02:00 Jark Wu <wu...@alibaba-inc.com>:
> >
> > > Hi ,
> > >
> > > I agree that Table API should be a SQL-like API, that do not need to be
> > > strictly consistent with SQL.
> > >
> > > > - Not put the window definition into the groupBy clause.
> > >
> > > +1  Putting the window clause out of groupBy, will be easy to learn and
> > > easy to discover for users.
> > >
> > > > I like the idea of having separate ".window()" and ".rowWindow()"
> > > > clauses.
> > >
> > > +1
> > >
> > > >  I would prefer to not have a "partitionBy" statement.
> > >
> > > As far as I know, the partitionBy is playing the same role with groupBy
> > in
> > > Table API. So maybe groupBy is enough, there is no need to introduce a
> > new
> > > clause.
> > >
> > >
> > > - Jark Wu
> > >
> > > > 在 2016年10月27日,上午2:22,Stephan Ewen <se...@apache.org> 写道:
> > > >
> > > > Hi all!
> > > >
> > > > I think that in order to get a better hold on how we what to build
> the
> > > > Table API, we need to *decide what the role of the Table API should
> > be*.
> > > We
> > > > touched on that a few times, but I think we still have different
> ideas
> > > > about that.
> > > >
> > > > To get there, let me take back a step and look at the design of
> Stream
> > > SQL
> > > > again. There were basically two competing approaches:
> > > >
> > > > (1) Keep SQL as it is and make it run on infinite streams via
> > introducing
> > > > dynamic tables
> > > > (2) Do a new language that is similar to SQL, but designed with
> > streaming
> > > > concepts in mind (first class support for time and windows)
> > > >
> > > > Both approaches had good points. The Stream SQL design doc posted
> > > followed
> > > > approach (1) - keep SQL as it is.
> > > >
> > > >
> > > >
> > > > Now, for the Table API, we seem to be having a similar discussion
> > again.
> > > >
> > > > (1) Let the Table API be as similar to SQL as possible, simply make
> it
> > > feel
> > > > "fluently embedded" in Scala.
> > > > (2) Define the Table API as one would define a new and clean DSL for
> > > > streaming. SQL inspired, of course. Where SQL syntax feels natural,
> use
> > > the
> > > > SQL syntax, but make it very accessible to Java/Scala (non-SQL)
> > > programmers.
> > > >
> > > >
> > > > I am personally more in favor of variant (2) for the following
> reasons:
> > > >
> > > >  - We already have SQL compliant with the standards and tools.
> > > >
> > > >  - Mirroring SQL too closely into the Table API has the marginal
> > benefit
> > > > that someone close to SQL will find it a bit more familiar. Not sure
> if
> > > > that is even the case, as they have to re-learn the fluent DSL and
> > Scala
> > > > concepts
> > > >
> > > >  - We are making it more difficult for all those that come from a
> more
> > > > Scala/Java DataStream background and simply want to move "a layer
> up",
> > > > getting schema and more optimizations into the equation.
> > > >
> > > >
> > > >
> > > >
> > > > What would that mean for the specific issues that are discussed in
> > > FLIP-11?
> > > > Based on interpreting the Table API as re-imaged streaming DSL, I
> would
> > > > suggest to
> > > >
> > > >  - Not put the window definition into the groupBy clause. It just is
> > > > unexpected for all that are not super familiar with SQL and hard to
> > > > discover in the IDE. A separate window clause is simpler for users
> > coming
> > > > from the DataStream background (or other streaming APIs) and it is
> more
> > > > discoverable in the IDE.
> > > >
> > > >  - I like the idea of having separate ".window()" and ".rowWindow()"
> > > > clauses. Makes it more explicit that very different things will
> happen.
> > > >
> > > >  - I would prefer to not have a "partitionBy" statement. When we
> > restrain
> > > > the Table API at least initially to having one partitioning for the
> > > > windows, we should be able to express the partitioning by simply
> adding
> > > it
> > > > to the fields in the "groupBy" clause. That would make the API easier
> > > > accessible to users that not SQL powerusers.
> > > >
> > > >
> > > > What do others think?
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > >
> > > >> Thanks for your reply Shaoxuan!
> > > >>
> > > >> Please see my replies below.
> > > >>
> > > >> Best, Fabian
> > > >>
> > > >> 2016-10-14 11:34 GMT+02:00 Sean Wang <ws...@gmail.com>:
> > > >>
> > > >>> Thanks for your quick reply, Fabian.
> > > >>>
> > > >>> I have a few minor comments&suggestions:
> > > >>>
> > > >>> <GroupBy with window>
> > > >>> - Agree that we should consider GroupBy without window after the
> new
> > > SQL
> > > >>> proposal is settled down.
> > > >>>
> > > >>>
> > > >> OK, so we keep this as it is for now. GroupBy without windows will
> be
> > > >> supported later when we are able to "guard" the memory requirements
> of
> > > that
> > > >> operation.
> > > >>
> > > >>
> > > >>> <GroupBy with window>
> > > >>> - For Java API, we can keep window() call, and put window alias
> into
> > > >>> Groupby clause. This can be also applied to rowwindow case.
> > > >>>
> > > >>>
> > > >> Referring to the window alias in the groupBy clause would require to
> > > invert
> > > >> the methods, i.e., groupBy().window() -> window().groupBy(). I am
> not
> > > sure
> > > >> if that is more intuitive. Also, Scala and Java are using the same
> > class
> > > >> (Table) but different methods (Java uses String parameter, Scala
> > > Expression
> > > >> parameters). In my opinion it makes sense to have both APIs closely
> > > synced.
> > > >> So I would either keep window() (after groupBy) for Scala and Java
> or
> > > >> remove it for both.
> > > >>
> > > >>
> > > >>> <RowWindows> & <partitionby() for rowwindow>
> > > >>> -+1 to support replace groupby() by partitionby(). BTW, in the case
> > of
> > > >>> over, instead of partitionby, are we going to support orderby? If
> > yes,
> > > I
> > > >>> would suggest to define rowwindow as  rowwindow(PartionByParaType,
> > > >> OrderBy
> > > >>> ParaType, WindowParaType).
> > > >>>
> > > >>>
> > > >> The current FLIP-11 proposal supports defining both partitionBy and
> > > orderBy
> > > >> (with a few restrictions).
> > > >> PartitionBy is defined for all windows alike by calling
> > > >>
> > > >> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
> > > >> w2).select(count() over w1).
> > > >>
> > > >> Allowing windows with different partitioning would mean that data is
> > > >> shuffled to different nodes and that we need a distributed join to
> > > assemble
> > > >> the result rows. In principle this could be done but would be very
> > > >> expensive to execute (applies to batch and streaming). In my
> opinion,
> > we
> > > >> should not support this case.
> > > >>
> > > >> OrderBy is implicitly supported by the on() clause of RowWindows,
> > e.g.,
> > > >>
> > > >> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
> > > >>
> > > >> says that the window is ordered on the rowtime, i.e., event-time,
> > > >> attribute. For streaming we can only allow event-time order (or
> > > >> processing-time order which is always given). Orders on other
> > attributes
> > > >> would not be possible (for infinite dynamic input tables) or very
> > > expensive
> > > >> (memory and computation wise) to maintain (for finite dynamic input
> > > >> tables). For queries on batch tables, in principle all orders are
> > > possible.
> > > >> With the current proposal only count windows are supported for
> > arbitrary
> > > >> attribute types and time windows for timestamp attributes.
> > > >>
> > > >> So
> > > >>> - moving windows into the groupBy() call :   +1
> > > >>>
> > > >>
> > > >> +1
> > > >>
> > > >>
> > > >>> - making over() for rowWindow() with a single window definition.
> > > >>>
> > > >>
> > > >> +1
> > > >>
> > > >>
> > > >>> - additionally allowing window definitions in over():  +1 yes for
> > > scala,
> > > >>> but use alias for java API.
> > > >>>
> > > >>
> > > >> If we have the parser code for Java group windows in groupBy() it
> > > should be
> > > >> easy to adapt this for over(). But we should also keep the rowWindow
> > > method
> > > >> to define aliases.
> > > >>
> > > >>
> > > >>> - using partitionBy() instead of groupBy() for row windows?: +1,
> but
> > > >>> better to consider orderby, it may be even better to move
> > partitionBy()
> > > >>> into rowwindow.
> > > >>>
> > > >>>
> > > >> +1 to change groupBy() to partitionBy().
> > > >> I would not move partitionBy() into the RowWindow definition but
> keep
> > it
> > > >> outside to ensure only one partitioning is defined. The orderBy
> > > definition
> > > >> is already fluently included in the RowWindow via the on() method.
> > > >>
> > > >>
> > > >>
> > > >>> Regards,
> > > >>> Shaoxuan
> > > >>>
> > > >>>
> > > >>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fh...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Hi everybody,
> > > >>>>
> > > >>>> happy to see a good discussion here :-)
> > > >>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong
> > > question
> > > >>>> in a separate mail.
> > > >>>>
> > > >>>> Shaoxuan, thanks for the suggestions! I think we all agree that
> for
> > > SQL
> > > >>>> we should definitely follow the standard (batch) SQL syntax.
> > > >>>> In my opinion, the Table API does not necessarily have to be as
> > close
> > > as
> > > >>>> possible to SQL but should try to make a few things easier and
> also
> > > >> safer
> > > >>>> (easier is of course subjective).
> > > >>>>
> > > >>>> - GroupBy without windows: These are currently intentionally not
> > > >>>> supported and also not part of FLIP-11. Our motivation for not
> > > >> supporting
> > > >>>> this, is to guard the user from defining a query that fails when
> > being
> > > >>>> executed due to a very memory consuming operation. FLIP-11
> provides
> > a
> > > >> way
> > > >>>> to define such a query as a sliding row window with unbounded
> > > preceding
> > > >>>> rows. With the upcoming SQL proposal, queries that consume
> unbounded
> > > >> memory
> > > >>>> should be identified and rejected. I would be in favor of allowing
> > > >> groupBy
> > > >>>> without windows once the guarding mechanism are in place.
> > > >>>>
> > > >>>> - GroupBy with window: I think this is a question of taste.
> Having a
> > > >>>> window() call, makes the feature more explicit in my opinion.
> > However,
> > > >> I'm
> > > >>>> not opposed to move the windows into the groupBy clause.
> > > >>>> Implementation-wise it should be easy to move the window
> definition
> > > >> into to
> > > >>>> groupBy clause for the Scala Table API. For the Java Table API we
> > > would
> > > >>>> need to extend the parser quite a bit because windows would need
> to
> > be
> > > >>>> defined as Strings and not via objects.
> > > >>>>
> > > >>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW
> > > clause
> > > >>>> (implemented by PostgreSQL and Calcite) which allows to have
> > > "reusable"
> > > >>>> window definitions. I think this is a desirable feature. In the
> > > FLIP-11
> > > >>>> proposal the over() clause in select() refers to the predefined
> > > windows
> > > >>>> with aliases. In case only one window is defined, the over()
> clause
> > is
> > > >>>> optional and the same (and only) window is applied to all
> > aggregates.
> > > I
> > > >>>> think we can make the over() call mandatory to have the windowing
> > more
> > > >>>> explicit. It should also be possible to extend the over clause to
> > > >> directly
> > > >>>> accept RowWindows instead of window aliases. I would not make
> this a
> > > >>>> priority at the moment, but a feature that could be later added,
> > > because
> > > >>>> rowWindow() and over() cover all cases. Similar as for GroupBy
> with
> > > >>>> windows, we would need to extend the parser for the Java Table API
> > > >> though.
> > > >>>>
> > > >>>> Finally, I have an own suggestion:
> > > >>>> In FLIP-11, groupBy() is  used to define the partitioning of
> > > RowWindows.
> > > >>>> I think this should be changed to partitionBy() because groupBy()
> > > groups
> > > >>>> data and applies an aggregation to all rows of a group which is
> not
> > > >>>> happening here. In original SQL, the OVER clause features a
> > PARTITION
> > > BY
> > > >>>> clause. We are moving this out of the window definition, i.e.,
> OVER
> > > >> clause,
> > > >>>> to enforce the same partitioning for all windows (different
> > > >> partitionings
> > > >>>> would be a challenge to execute in a parallel system).
> > > >>>>
> > > >>>> @Timo and all: What do you think about:
> > > >>>>
> > > >>>> - moving windows into the groupBy() call
> > > >>>> - making over() for rowWindow() with a single window definition
> > > >>>> - additionally allowing window definitions in over()
> > > >>>> - using partitionBy() instead of groupBy() for row windows?
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zh...@huawei.com>:
> > > >>>>
> > > >>>>> Hi shaoxuan:
> > > >>>>>
> > > >>>>> I think,  the streamsql must be excuted in table environment. So
> I
> > > call
> > > >>>>> this table API ‘s StreamSQL. What do you call for this, stream
> > Table
> > > >> API or
> > > >>>>> streamsql? It is fu
> > > >>>>>
> > > >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >>>>> val tblEnv = TableEnvironment.getTableEnvironment(env)
> > > >>>>> val ds: DataStream[(String,Long, Long)] =
> > > >> env.readTextFile("/home/demo")
> > > >>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
> > > >>>>>    .map(f=>(f, 1L, 1L))
> > > >>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE
> userID='A'")
> > > >>>>>
> > > >>>>> So in my opinion, the grammar which is marked red should be
> > > compatible
> > > >>>>> with calcite's StreamSQL grammar.
> > > >>>>>
> > > >>>>> By the way,  thanks very much for telling me the modified content
> > in
> > > >>>>> Flink StreamSQL. I will look the new proposal .
> > > >>>>>
> > > >>>>> Thanks!
> > > >>>>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
> > > >>>>> 发送时间: 2016年10月13日 16:29
> > > >>>>> 收件人: dev@flink.apache.org; Zhangrucong
> > > >>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > > >>>>>
> > > >>>>> Hi  zhangrucong,
> > > >>>>> I am not sure what you mean by "table API'S StreamSQL", I guess
> you
> > > >> mean
> > > >>>>> "stream TableAPI"?
> > > >>>>> TableAPI should be compatible with calcite SQL. (By compatible,
> My
> > > >>>>> understanding is that both TableAPI and SQL will be translated to
> > the
> > > >> same
> > > >>>>> logical plan - the same set of REL and REX).
> > > >>>>> BTW, please note that we recently have merged a change to remove
> > > STREAM
> > > >>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch
> and
> > > >> stream
> > > >>>>> are not necessarily to be differentiated at the SQL level. The
> > major
> > > >>>>> difference between batch and stream is "WHEN and HOW to emit the
> > > >> result".
> > > >>>>> We have been working on a new proposal with Fabian on this
> change.
> > I
> > > >>>>> guess it will be sent out for review very soon.
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Shaoxuan
> > > >>>>>
> > > >>>>>
> > > >>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <
> > zhangrucong@huawei.com
> > > >>>>> <ma...@huawei.com>> wrote:
> > > >>>>> Hi shaoxuan:
> > > >>>>> Does the table API'S StreamSQL grammar is compatible with
> calcite's
> > > >>>>> StreamSQL grammar?
> > > >>>>>
> > > >>>>>
> > > >>>>> 1、In calcite, the tumble window is realized by using function
> > tumble
> > > or
> > > >>>>> hop. And the function must be used with group by, like this:
> > > >>>>>
> > > >>>>> SELECT
> > > >>>>>  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS
> rowtime,
> > > >>>>>  productId,
> > > >>>>>  COUNT(*) AS c,
> > > >>>>>  SUM(units) AS units
> > > >>>>> FROM Orders
> > > >>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
> > > >>>>>  productId;
> > > >>>>>
> > > >>>>> 2、 The sliding window uses keywords "window" and "over". Like
> this:
> > > >>>>>
> > > >>>>> SELECT  *
> > > >>>>> FROM (
> > > >>>>>  SELECT STREAM rowtime,
> > > >>>>>    productId,
> > > >>>>>    units,
> > > >>>>>    AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING)
> > AS
> > > >>>>> m10,
> > > >>>>>    AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS
> d7
> > > >>>>>  FROM Orders
> > > >>>>>  WINDOW product AS (
> > > >>>>>    ORDER BY rowtime
> > > >>>>>    PARTITION BY productId))
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks!
> > > >>>>>
> > > >>>>> -----邮件原件-----
> > > >>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:
> > > >>>>> shaoxuan.wsx@alibaba-inc.com>]
> > > >>>>> 发送时间: 2016年10月13日 2:03
> > > >>>>> 收件人: dev@flink.apache.org<ma...@flink.apache.org>
> > > >>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > > >>>>>
> > > >>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This
> > is a
> > > >>>>> really great and promising proposal. I have a few comments to the
> > > >> "window"
> > > >>>>> operator proposed in this FLIP (I am hoping it is not too late to
> > > >> bring up
> > > >>>>> this). First, window is not always needed for the stream
> > aggregation.
> > > >> There
> > > >>>>> are cases where we want do an aggreation on a stream, while the
> > > >> query/emit
> > > >>>>> strategy decides when to emit a streaming output. Second, window
> is
> > > >> needed
> > > >>>>> when we want do an aggregation for a certain rage, but window is
> > not
> > > an
> > > >>>>> operator. We basically use window to define the range for
> > > aggregation.
> > > >> In
> > > >>>>> tableAPI, a window should be defined together with "groupby" and
> > > >> "select"
> > > >>>>> operators, either inside a "groupby" operator or after an "over"
> > > >> clause in
> > > >>>>> "select" operator. This will make the TableAPI in the similar
> > manner
> > > >> as SQL.
> > > >>>>> For instance,[A groupby without window]
> > > >>>>> <Table API>
> > > >>>>> val res = tab
> > > >>>>> .groupBy(‘a)
> > > >>>>> .select(‘a, ‘b.sum)
> > > >>>>> <SQL>
> > > >>>>> SELECT a, SUM(b)
> > > >>>>> FROM tab
> > > >>>>> GROUP BY a
> > > >>>>> [A tumble window inside groupby]
> > > >>>>> <Table API>val res = tab
> > > >>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
> > > >>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
> > > >> ‘rowtime) [A
> > > >>>>> row tumble window after OVER] <Table API>.groupby('a) //optional
> > > >>>>> .select(‘a, ‘b.count over rowTumble(10.minutes,
> > ‘rowtime))<SQL>SELECT
> > > >> a,
> > > >>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
> > > >> Please let
> > > >>>>> me know what you think.
> > > >>>>> Regards,Shaoxuan
> > > >>>>> ------------------------------------------------------------
> > > >> ------发件人:Fabian
> > > >>>>> Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com
> > >>发送时间:2016年9月26日
> > > >> (星期一)
> > > >>>>> 21:13收件人:dev@flink.apache.org<ma...@flink.apache.org> <
> > > >>>>> dev@flink.apache.org<ma...@flink.apache.org>>主 题:Re:
> > [DISCUSS]
> > > >>>>> FLIP-11: Table API Stream Aggregations Hi everybody,
> > > >>>>>
> > > >>>>> Timo proposed our FLIP-11 a bit more than three weeks ago.
> > > >>>>> I will update the status of the FLIP to accepted.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Fabian
> > > >>>>>
> > > >>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org
> <mailto:
> > > twa
> > > >>>>> lthr@apache.org>>:
> > > >>>>>
> > > >>>>>> Hi Jark,
> > > >>>>>>
> > > >>>>>> yes I think enough time has passed. We can start implementing
> the
> > > >>>>> changes.
> > > >>>>>> What do you think Fabian?
> > > >>>>>>
> > > >>>>>> If there are no objections, I will create the subtasks in Jira
> > > today.
> > > >>>>>> For
> > > >>>>>> FLIP-11/1 I already have implemented a prototype, I just have to
> > do
> > > >>>>>> some
> > > >>>>>> refactoring/documentation before opening a PR.
> > > >>>>>>
> > > >>>>>> Timo
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu:
> > > >>>>>>
> > > >>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> It seems that there’s no objections to the window design. So
> > could
> > > >> we
> > > >>>>>>> open subtasks to start working on it now ?
> > > >>>>>>>
> > > >>>>>>> - Jark Wu
> > > >>>>>>>
> > > >>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
> > > >>>>> wuchong.wc@alibaba-inc.com>> 写道:
> > > >>>>>>>>
> > > >>>>>>>> Hi Fabian,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for sharing your ideas.
> > > >>>>>>>>
> > > >>>>>>>> They all make sense to me. Regarding to reassigning
> timestamp, I
> > > do
> > > >>>>>>>> not
> > > >>>>>>>> have an use case. I come up with this because DataStream has a
> > > >>>>>>>> TimestampAssigner :)
> > > >>>>>>>>
> > > >>>>>>>> +1 for this FLIP.
> > > >>>>>>>>
> > > >>>>>>>> - Jark Wu
> > > >>>>>>>>
> > > >>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:
> fhue
> > > >>>>> ske@gmail.com> <mailto:
> > > >>>>>
> > > >>>>>>>>> fhueske@gmail.com<ma...@gmail.com>>> 写道:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi,
> > > >>>>>>>>>
> > > >>>>>>>>> thanks for your comments and questions!
> > > >>>>>>>>> Actually, you are bringing up the points that Timo and I
> > > discussed
> > > >>>>>>>>> the
> > > >>>>>>>>> most
> > > >>>>>>>>> when designing the FLIP ;-)
> > > >>>>>>>>>
> > > >>>>>>>>> - We also thought about the syntactic shortcut for running
> > > >>>>>>>>> aggregates
> > > >>>>>>>>> like
> > > >>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to
> > not
> > > >>>>>>>>> allow
> > > >>>>>>>>> this shortcut is to prevent users from accidentally
> performing
> > a
> > > >>>>>>>>> "dangerous" operation. The problem with unbounded sliding
> > > >>>>>>>>> row-windows is
> > > >>>>>>>>> that their state does never expire. If you have an evolving
> key
> > > >>>>>>>>> space,
> > > >>>>>>>>> you
> > > >>>>>>>>> will likely run into problems at some point because the
> > operator
> > > >>>>>>>>> state
> > > >>>>>>>>> grows too large. IMO, a row-window session is a better
> > approach,
> > > >>>>>>>>> because it
> > > >>>>>>>>> defines a timeout after which state can be discarded.
> > > >>>>>>>>> groupBy.select is
> > > >>>>>>>>> a
> > > >>>>>>>>> very common operation in batch but its semantics in streaming
> > are
> > > >>>>>>>>> very
> > > >>>>>>>>> different. In my opinion it makes sense to make users aware
> of
> > > >>>>>>>>> these
> > > >>>>>>>>> differences through the API.
> > > >>>>>>>>>
> > > >>>>>>>>> - Reassigning timestamps and watermarks is a very delicate
> > issue.
> > > >>>>>>>>> You
> > > >>>>>>>>> are
> > > >>>>>>>>> right, that Calcite exposes this field which is necessary due
> > to
> > > >>>>>>>>> the
> > > >>>>>>>>> semantics of SQL. However, also in Calcite you cannot freely
> > > >> choose
> > > >>>>>>>>> the
> > > >>>>>>>>> timestamp attribute for streaming queries (it must be a
> > monotone
> > > >> or
> > > >>>>>>>>> quasi-monotone attribute) which is hard to reason about (and
> > > >>>>>>>>> guarantee)
> > > >>>>>>>>> after a few operators have been applied. Streaming tables in
> > > Flink
> > > >>>>>>>>> will
> > > >>>>>>>>> likely have a time attribute which is identical to the
> initial
> > > >>>>> rowtime.
> > > >>>>>>>>> However, Flink does modify timestamps internally, e.g., for
> > > >> records
> > > >>>>>>>>> that
> > > >>>>>>>>> are emitted from time windows, in order to ensure that
> > > consecutive
> > > >>>>>>>>> windows
> > > >>>>>>>>> perform as expected. Modify or reassign timestamps in the
> > middle
> > > >> of
> > > >>>>>>>>> a
> > > >>>>>>>>> job
> > > >>>>>>>>> can result in unexpected results which are very hard to
> reason
> > > >>>>>>>>> about. Do
> > > >>>>>>>>> you have a concrete use case in mind for reassigning
> > timestamps?
> > > >>>>>>>>>
> > > >>>>>>>>> - The idea to represent rowtime and systime as object is
> good.
> > > Our
> > > >>>>>>>>> motivation to go for reserved Scala symbols was to have a
> > uniform
> > > >>>>>>>>> syntax
> > > >>>>>>>>> with windows over streaming and batch tables. On batch tables
> > you
> > > >>>>>>>>> can
> > > >>>>>>>>> compute time windows basically over every time attribute
> (they
> > > are
> > > >>>>>>>>> treated
> > > >>>>>>>>> similar to grouping attributes with a bit of extra logic to
> > > >> extract
> > > >>>>>>>>> the
> > > >>>>>>>>> grouping key for sliding and session windows). If you write
> > > >>>>>>>>> window(Tumble
> > > >>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime
> > would
> > > >>>>>>>>> indicate
> > > >>>>>>>>> event-time. On a batch table with a 'rowtime attribute, the
> > same
> > > >>>>>>>>> operator
> > > >>>>>>>>> would be internally converted into a group by. By going for
> the
> > > >>>>>>>>> object
> > > >>>>>>>>> approach we would lose this compatibility (or would need to
> > > >>>>>>>>> introduce an
> > > >>>>>>>>> additional column attribute to specifiy the window attribute
> > for
> > > >>>>>>>>> batch
> > > >>>>>>>>> tables).
> > > >>>>>>>>>
> > > >>>>>>>>> As usual some of the design decisions are based on
> preferences.
> > > >>>>>>>>> Do they make sense to you? Let me know what you think.
> > > >>>>>>>>>
> > > >>>>>>>>> Best, Fabian
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <
> wuchong.wc@alibaba-inc.com
> > <ma
> > > >>>>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
> > > >>>>>
> > > >>>>>>>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com
> > >>>:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I'm on vacation for about five days , sorry to have missed
> > this
> > > >>>>>>>>>> great
> > > >>>>>>>>>> FLIP.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Yes, the non-windowed aggregates is a special case of
> > > row-window.
> > > >>>>>>>>>> And
> > > >>>>>>>>>> the
> > > >>>>>>>>>> proposal looks really good.  Can we have a simplified form
> for
> > > >> the
> > > >>>>>>>>>> special
> > > >>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
> > > >>>>>>>>>> ideRows.unboundedPreceding).select(…)
> > > >>>>>>>>>> can be simplified to  table.groupBy(‘a).select(…). The
> latter
> > > >> will
> > > >>>>>>>>>> actually
> > > >>>>>>>>>> call the former.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Another question is about the rowtime. As the FLIP said,
> > > >>>>>>>>>> DataStream and
> > > >>>>>>>>>> StreamTableSource is responsible to assign timestamps and
> > > >>>>>>>>>> watermarks,
> > > >>>>>>>>>> furthermore “rowtime” and “systemtime” are not real column.
> > IMO,
> > > >>>>>>>>>> it is
> > > >>>>>>>>>> different with Calcite’s rowtime, which is a real column in
> > the
> > > >>>>> table.
> > > >>>>>>>>>> In
> > > >>>>>>>>>> FLIP's way, we will lose some flexibility. Because the
> > timestamp
> > > >>>>>>>>>> column may
> > > >>>>>>>>>> be created after some transformations or join operation, not
> > > >>>>>>>>>> created at
> > > >>>>>>>>>> beginning. So why do we have to define rowtime at beginning?
> > > >>>>>>>>>> (because
> > > >>>>>>>>>> of
> > > >>>>>>>>>> watermark?)     Can we have a way to define rowtime after
> > source
> > > >>>>>>>>>> table
> > > >>>>>>>>>> like
> > > >>>>>>>>>> TimestampAssinger?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Regarding to “allowLateness” method. I come up a trick that
> we
> > > >> can
> > > >>>>>>>>>> make
> > > >>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol
> > > >> expression.
> > > >>>>>>>>>> The API
> > > >>>>>>>>>> will looks like this :
> > > >>>>>>>>>>
> > > >>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness as
> ‘w)
> > > >>>>>>>>>>
> > > >>>>>>>>>> The implementation will look like this:
> > > >>>>>>>>>>
> > > >>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > > >>>>>>>>>>  def on(time: rowtime.type): TumblingEventTimeWindow =
> > > >>>>>>>>>>      new TumblingEventTimeWindow(alias, ‘rowtime, size)
> > > >> //
> > > >>>>>>>>>> has
> > > >>>>>>>>>> allowLateness() method
> > > >>>>>>>>>>
> > > >>>>>>>>>>  def on(time: systemtime.type):
> TumblingProcessingTimeWindow=
> > > >>>>>>>>>>     new TumblingProcessingTimeWindow(alias, ‘systemtime,
> > size)
> > > >>>>>>>>>> // hasn’t allowLateness() method
> > > >>>>>>>>>> }
> > > >>>>>>>>>> object rowtime
> > > >>>>>>>>>> object systemtime
> > > >>>>>>>>>>
> > > >>>>>>>>>> What do you think about this?
> > > >>>>>>>>>>
> > > >>>>>>>>>> - Jark Wu
> > > >>>>>>>>>>
> > > >>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org
> <mailto:
> > twa
> > > >>>>> lthr@apache.org> <mailto:
> > > >>>>>
> > > >>>>>>>>>>> twalthr@apache.org<ma...@apache.org>>> 写道:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I thought about the API of the FLIP again. If we allow the
> > > >>>>>>>>>>> "systemtime"
> > > >>>>>>>>>>>
> > > >>>>>>>>>> attribute, we cannot implement a nice method chaining where
> > the
> > > >>>>>>>>>> user
> > > >>>>>>>>>> can
> > > >>>>>>>>>> define a "allowLateness" only on event time. So even if the
> > user
> > > >>>>>>>>>> expressed
> > > >>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness"
> > > >>>>>>>>>> method
> > > >>>>>>>>>> because
> > > >>>>>>>>>> we have to assume that this attribute can also be the batch
> > > event
> > > >>>>>>>>>> time
> > > >>>>>>>>>> column, which is not very nice.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > > >>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
> > > >>>>>>>>>>>   new TumblingEventTimeWindow(alias, timeField, size) //
> has
> > > >>>>>>>>>>>
> > > >>>>>>>>>> allowLateness() method
> > > >>>>>>>>>>
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> What do you think?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Timo
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> you had asked for non-windowed aggregates in the Table
> API a
> > > >> few
> > > >>>>>>>>>>>> times.
> > > >>>>>>>>>>>> FLIP-11 proposes row-window aggregates which are a
> > > >>>>>>>>>>>> generalization of
> > > >>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Can you have a look at the FLIP and give feedback whether
> > this
> > > >>>>>>>>>>>> is
> > > >>>>>>>>>>>> what
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> you
> > > >>>>>>>>>>
> > > >>>>>>>>>>> are looking for?
> > > >>>>>>>>>>>> Improvement suggestions are very welcome as well.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>> Fabian
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo
> > > Walther
> > > >>>>> <tw...@apache.org> <mailto:
> > > >>>>>>>>>>>> twalthr@apache.org<ma...@apache.org>>>:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Hi all!
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in
> > the
> > > >>>>>>>>>>>>> Table
> > > >>>>>>>>>>>>> API.
> > > >>>>>>>>>>>>> You can find the FLIP-11 here:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%
> > <h
> > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
> > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%
> > <h
> > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
> > > >>>>>
> > > >>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Motivation for the FLIP:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> The Table API is a declarative API to define queries on
> > > static
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>> streaming tables. So far, only projection, selection, and
> > > >> union
> > > >>>>>>>>>>>>> are
> > > >>>>>>>>>>>>> supported operations on streaming tables.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> This FLIP proposes to add support for different types of
> > > >>>>>>>>>>>>> aggregations
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> on
> > > >>>>>>>>>>
> > > >>>>>>>>>>> top of streaming tables. In particular, we seek to support:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates which are
> > > computed
> > > >>>>>>>>>>>>> for a
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> group
> > > >>>>>>>>>>
> > > >>>>>>>>>>> of elements. A (time or row-count) window is required to
> > bound
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> infinite
> > > >>>>>>>>>>
> > > >>>>>>>>>>> input stream into a finite group.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates which are
> > computed
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>> each
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> row,
> > > >>>>>>>>>>
> > > >>>>>>>>>>> based on a window (range) of preceding and succeeding rows.
> > > >>>>>>>>>>>>> Each type of aggregate shall be supported on
> keyed/grouped
> > or
> > > >>>>>>>>>>>>> non-keyed/grouped data streams for streaming tables as
> well
> > > as
> > > >>>>>>>>>>>>> batch
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> tables.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> We are looking forward to your feedback.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Timo
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>> --
> > > >>>>>>>>>>> Freundliche Grüße / Kind Regards
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Timo Walther
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Follow me: @twalthr
> > > >>>>>>>>>>> https://www.linkedin.com/in/twalthr
> > > >>>>>>>>>>> <https://www.linkedin.com/in/t
> > > >>>>>>>>>>> walthr>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Freundliche Grüße / Kind Regards
> > > >>>>>>
> > > >>>>>> Timo Walther
> > > >>>>>>
> > > >>>>>> Follow me: @twalthr
> > > >>>>>> https://www.linkedin.com/in/twalthr
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>