You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xingcan Cui <xi...@gmail.com> on 2017/08/01 04:04:46 UTC

Re: [DISCUSS] Table API / SQL internal timestamp handling

Hi Shaoxuan,

I really appreciate your prompt reply. What you explained makes sense to me.

There is only one point that I got some different ideas about "we have to
buffer
all the delta data between watermarks of two inputs".

Consider the following SQL on joining two streams l and r:

SELECT * FROM l, r
WHERE l.name = r.name
AND l.ts BETWEEN r.ts - INTERVAL '600' MINUTE
    AND r.ts - INTERVAL '599' MINUTE;

This query is valid since it holds both an equi-key and a time span
restriction.

There are two different situations to execute the query: (1) if the
timestamps of
l and r are synchronized, e.g., they both contain new generated events, we
must
buffer the l stream for 600 minutes; and (2) if there exists a natural
offset of the two
streams, e.g., the r stream is new generated while the l stream is sourced
from
a event queue generated 10 hours ago, it is unnecessary to buffer so much
data.

That raises the question. What if the timestamps of the two streams are
essentially
“irreconcilable"?

Best,
Xingcan

On Mon, Jul 31, 2017 at 10:42 PM, Shaoxuan Wang <ws...@gmail.com> wrote:

> Xingcan,
> Watermark is the “estimate of completion”. User defines the waterMark based
> on the best estimation per each input of when it pretty much sees all the
> data. It is usually calculated by the event timestamp.
> When we do a windowed join, we have to make sure the watermark for both
> inputs are received before emit a window result at this watermark. If the
> two inputs have large difference, say "one for today and the other one
> for yesterday" as you pointed out, the watermark for the windowed join
> operator is just yesterday.  I guess this is what Fabian means "In case of
> a join, the smallest future timestamp depends on two fields and not just on
> one." In the windowed join cases, we have to buffer all the delta data
> between watermarks of two inputs. It is the user's responsibility (if
> she/he wants to reduce the cost) to align watermarks of the stream sources
> as much as possible.
>
> Regards,
> Shaoxuan
>
>
> On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
> > Hi Fabian,
> >
> > I got a similar question with Jark. Theoretically, the row times of two
> > streams
> > could be quite difference, e.g., one for today and the other one for
> > yesterday.
> > How can we align them?
> >
> > Best,
> > Xingcan
> >
> > On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Hi Jark,
> > >
> > > yes, the handling of watermarks is very tricky. It is not directly
> > related
> > > to the proposal which is only about the representation of timestamps
> but
> > > becomes important for event-time joins.
> > > We have a JIRA about an operator that is able to hold back watermarks
> > [1].
> > >
> > > Roughly the idea is to track the smallest timestamp that will be
> emitted
> > in
> > > the future and align the watermark to this timestamp.
> > > For this we need to know the semantics of the operator (which timestamp
> > > will be emitted in the future) but this will be given for relational
> > > operators.
> > > The new operator could emit a watermark whenever it received one.
> > >
> > > In case of a join, the smallest future timestamp depends on two fields
> > and
> > > not just on one.
> > >
> > > Best,
> > > Fabian
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-7245
> > >
> > >
> > > 2017-07-31 14:35 GMT+02:00 Jark Wu <ja...@apache.org>:
> > >
> > > > Hi,
> > > >
> > > > @Fabian, I read your proposal carefully again, and I'm big +1 to do
> it.
> > > The
> > > > proposal can address the problem of that how to forward both input
> > > tables'
> > > > rowtime of dual stream join (windowed/non-windowed). The additional
> > > > payload drawback
> > > > is acceptable.
> > > >
> > > > You mentioned that:
> > > >
> > > > > The query operators ensure that the watermarks are always behind
> all
> > > > > event-time timestamps. With additional analysis we will be able to
> > > > restrict
> > > > > this to timestamps that are actually used as such.
> > > >
> > > > I'm more curious about how can we define the watermark strategies in
> > > order
> > > > to make sure all timestamp columns are aligned to watermarks.
> > Especially,
> > > > when the watermark has been defined in the input DataStream.
> > > >
> > > > Bests,
> > > > Jark Wu
> > > >
> > > >
> > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xi...@gmail.com>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for the answers, @Fabian.
> > > > >
> > > > > @Jark, at first I also wanted the users to reassign the timestamp
> > field
> > > > > arbitrarily. However, that means we have to break the current "time
> > > > system"
> > > > > and create a new one. The blocked watermarks become meaningless and
> > > > maybe a
> > > > > new WatermarkAssigner should be provided. A little more strict
> > > mechanism
> > > > > would be only allowing to use the existing timestamp fields. It
> > sounds
> > > > > reasonable, but will bring an unnecessary barrier to stream/batch
> > SQL,
> > > > i.e.
> > > > > some SQL works for the batch can not be executed in the stream
> > > > environment.
> > > > > I just wonder if we could automatically choose a field, which will
> be
> > > > used
> > > > > in the following calculations. Not sure if it makes sense.
> > > > >
> > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main
> > block
> > > > for
> > > > > consolidating stream/batch SQL. Though from a general point of
> view,
> > it
> > > > can
> > > > > indicate the time to some extent, the randomness property
> determines
> > > that
> > > > > it should never be used in time-sensitive applications. I always
> > > believe
> > > > in
> > > > > that all the information used for query evaluation should be
> acquired
> > > > from
> > > > > data itself.
> > > > >
> > > > > Best,
> > > > > Xingcan
> > > > >
> > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fh...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Shaoxuan,
> > > > > >
> > > > > > thanks for your comments. I agree with your comment:
> > > > > >
> > > > > > > The problem we used to have is that we have treated eventtime
> > > column
> > > > > as a
> > > > > > special timestamp column.
> > > > > >
> > > > > > IMO, an event-time timestamp column is a regular column that is
> > > aligned
> > > > > > with the watermarks of the stream.
> > > > > > In order to distinguish watermark aligned columns from others, we
> > > need
> > > > a
> > > > > > special flag in the schema.
> > > > > > When a timestamp column is modified and we cannot guarantee that
> is
> > > it
> > > > > > still aligned with the watermarks, it must lose the special flag
> > and
> > > be
> > > > > > treated like any other column.
> > > > > >
> > > > > > Regarding your comments:
> > > > > > 1) I agree, that we can use Long in addition to Timestamp as a
> > > > timestamp
> > > > > > columns. Since timestamp columns need to be comparable to
> > watermarks
> > > > > which
> > > > > > are Longs, I don't see that other types would make sense. For
> now,
> > I
> > > > > would
> > > > > > keep the restriction that timestamps can only be of Timestamp
> > type. I
> > > > > > think, extending this to Long would be a follow-up issue to the
> > > > changes I
> > > > > > proposed here.
> > > > > > 2) Relates to 1) and I agree. if we use a Long attribute as
> > timestamp
> > > > it
> > > > > > should remain of type Long. For now I would keep converting it to
> > > > > Timestamp
> > > > > > and change that later.
> > > > > > 3) Yes, timestamp columns must be aligned to watermarks. That's
> > their
> > > > > > primary characteristic. How to define watermark strategies is
> > > > orthogonal
> > > > > to
> > > > > > this discussion, IMO.
> > > > > > 4) From my point of view, proc-time is a purely virtual column
> and
> > > not
> > > > > > related to an actual (data) column. However, it must be part of
> the
> > > > > schema
> > > > > > and treated like any other attribute for a good user experience
> and
> > > SQL
> > > > > > compliance. In order to be able to join two tables on processing
> > > time,
> > > > it
> > > > > > must be possible to include a processing time column in the
> schema
> > > > > > definition of the table. Processing time queries can never
> compute
> > > the
> > > > > same
> > > > > > results as batch queries but their semantics should be aligned
> with
> > > > > > event-time queries.
> > > > > >
> > > > > > Best, Fabian
> > > > > >
> > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com
> >:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > @Shaoxuan - thanks for the  remarks. I have a question
> regarding
> > > your
> > > > > > > suggestion not to consider to create proctime window in a
> regular
> > > > > > column. I
> > > > > > > think this would be useful though. First you might need to
> carry
> > > the
> > > > > > > timestamp indicator of when the processing happened (for log
> > > > purposes,
> > > > > > > provenance, traceability ...). Secondly - I do not think it is
> > > > > > > contradicting with the semantics in batch SQL as in SQL you
> have
> > > the
> > > > > > > function "now()" ...which pretty much carry the same semantics
> as
> > > > > having
> > > > > > a
> > > > > > > function to mark the proctime and then projecting this into a
> > > column.
> > > > > If
> > > > > > I
> > > > > > > am not mistaken you can introduce in database columns the
> result
> > of
> > > > > > calling
> > > > > > > now().
> > > > > > >
> > > > > > >
> > > > > > > Dr. Radu Tudoran
> > > > > > > Staff Research Engineer - Big Data Expert
> > > > > > > IT R&D Division
> > > > > > >
> > > > > > >
> > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > German Research Center
> > > > > > > Munich Office
> > > > > > > Riesstrasse 25, 80992 München
> > > > > > >
> > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > Mobile: +49 15209084330
> > > > > > > Telephone: +49 891588344173
> > > > > > >
> > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > > 56063,
> > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > > 56063,
> > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > This e-mail and its attachments contain confidential
> information
> > > from
> > > > > > > HUAWEI, which is intended only for the person or entity whose
> > > address
> > > > > is
> > > > > > > listed above. Any use of the information contained herein in
> any
> > > way
> > > > > > > (including, but not limited to, total or partial disclosure,
> > > > > > reproduction,
> > > > > > > or dissemination) by persons other than the intended
> recipient(s)
> > > is
> > > > > > > prohibited. If you receive this e-mail in error, please notify
> > the
> > > > > sender
> > > > > > > by phone or email immediately and delete it!
> > > > > > >
> > > > > > >
> > > > > > > -----Original Message-----
> > > > > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
> > > > > > > Sent: Thursday, July 27, 2017 6:00 AM
> > > > > > > To: Dev
> > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> > handling
> > > > > > >
> > > > > > >  Hi Everyone,
> > > > > > > I like this proposal. The problem we used to have is that we
> have
> > > > > treated
> > > > > > > eventtime column as a special timestamp column. An eventtime
> > column
> > > > is
> > > > > > > nothing special than all other regular columns, but with a
> > certain
> > > > flag
> > > > > > > (eventtime-indicator) inferring that this column can be used as
> > an
> > > > > > eventime
> > > > > > > to decide when a bounded query can emit the final result by
> > > comparing
> > > > > > with
> > > > > > > a concern associated waterMark.
> > > > > > >
> > > > > > > I have a few comments adding on top of this (they may have
> > already
> > > > been
> > > > > > > addressed in the conversation — since It’s a long discussion, I
> > may
> > > > > miss
> > > > > > > something):
> > > > > > >
> > > > > > >    1. While we remove timestamp column, we introduce
> > > > > eventtime-indicator
> > > > > > >    (we may already have this concept), it is only a flag can be
> > > > applied
> > > > > > for
> > > > > > >    any column (note that some types may not be able to be used
> as
> > > > > > eventtime
> > > > > > >    column), indicating if this column can be used as eventtime
> or
> > > > not.
> > > > > > This
> > > > > > >    flag is useful for validation and codeGen.
> > > > > > >    2. A column that has been used as an eventtime, should not
> > lose
> > > > its
> > > > > > own
> > > > > > >    type. We should not cast all eventime column to the
> timestamp
> > > > type.
> > > > > > For
> > > > > > >    instance, if a column is a long type, it will keep as long
> > type
> > > > even
> > > > > > if
> > > > > > > a
> > > > > > >    window aggregate has used it as a eventtime.
> > > > > > >    3. Eventtime will only work well with some associated
> > waterMark
> > > > > > >    strategy. We may consider forcing user to provide a
> waterMark
> > > > logic
> > > > > on
> > > > > > >    his/her selected eventtime.
> > > > > > >    4. For proctime, I hope we should not introduce
> > > proctime-indicator
> > > > > for
> > > > > > >    regular column. Ideally we should not allow user to create
> > > > proctime
> > > > > > > window
> > > > > > >    on regular column, as this is against the batch query
> > semantics.
> > > > > > > Therefore
> > > > > > >    I suggest we should always introduce a proctime timestamp
> > column
> > > > for
> > > > > > > users
> > > > > > >    to create proctime window. And unlike eventtime, proctime
> does
> > > not
> > > > > > need
> > > > > > > any
> > > > > > >    associated waterMark strategy, as there is no such out of
> > order
> > > > > issue
> > > > > > > for
> > > > > > >    the proctime.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Shaoxuan
> > > > > > >
> > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <
> > fhueske@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks everybody for the replies so far.
> > > > > > > >
> > > > > > > > Let me answer your questions and reply to your thoughts:
> > > > > > > >
> > > > > > > > Radu:
> > > > > > > > ---
> > > > > > > > First of all, although my proposal is movivated by a join
> > > operator,
> > > > > > > > this discussion is about timestamp handling, not about joins
> in
> > > > > > general.
> > > > > > > >
> > > > > > > > - The semantics of outer joins is to emit null and there is
> no
> > > way
> > > > > > > > around that. This is not an issue for us. Actually, outer
> joins
> > > are
> > > > > > > > supported by the batch SQL / Table API. It is true that outer
> > > joins
> > > > > > > > might result in null timestamps. Calcite will mark those
> fields
> > > as
> > > > > > > > nullable and we should check that timestamps which are used
> in
> > > > > windows
> > > > > > > or joins are not nullable.
> > > > > > > > - The query has to explicitly specify which timestamp
> attribute
> > > to
> > > > > use.
> > > > > > > > Otherwise its semantics are not complete and it is invalid. A
> > > > > > > > group-window that follows a join will reference a timestamp
> > > > attribute
> > > > > > > > and this will be used. The other timestamp might be projected
> > > out.
> > > > > > > > When a result with two timestamps is converted into a
> > DataStream,
> > > > the
> > > > > > > > user has to decide. This could be done inside of the Table to
> > > > > > > > DataStream conversion. If the Table has more than one valid
> > > > > timestamp,
> > > > > > > > the conversion will ask which timestamp to forward.
> > > > > > > > - A proctime join should forward all proctime attributes of
> the
> > > > input
> > > > > > > > tables. All will be the same, but that does not matter
> because
> > > they
> > > > > > > > are either virtual or represented as 1 byte dummy attributes.
> > > Also,
> > > > > > > > unused ones will be automatically projected out anyway.
> > > > > > > > - An event-time join should forward all event-time attributes
> > of
> > > > the
> > > > > > > > input tables. Creating a new event-time attribute using
> > > processing
> > > > > > > > time makes event-time processing pointless and will give
> > > completely
> > > > > > > random results.
> > > > > > > > Event-time is not about the "time an event is created" but
> > about
> > > a
> > > > > > > > timestamp that is associated with an event. For example an
> > order
> > > > > event
> > > > > > > > could have three timestamps: "orderTime", "shipTime", and
> > > > > > "receiveTime".
> > > > > > > > Each could be a valid event-time attribute.
> > > > > > > >
> > > > > > > > Jark:
> > > > > > > > ---
> > > > > > > > Thanks for the proposal. I think I understand what you want
> to
> > > > > achieve
> > > > > > > > with this, but I think functions to instantiate time
> attributes
> > > are
> > > > > > > > not necessary and would make things more complicated. The
> point
> > > of
> > > > > > > > supporting multiple time attributes is to ensure that all of
> > them
> > > > are
> > > > > > > > aligned with the watermarks. If we add a method
> > > ROW_TIME(timestamp)
> > > > > > > > and we don't know if the timestamp is aligned with the
> > > watermarks.
> > > > If
> > > > > > > > that is not the case, the query won't be executed as
> expected.
> > > The
> > > > > > > > issue of LEFT JOIN can easily be addressed by checking for
> > > > > > > > nullablility during optimization when an operator tries to
> use
> > > it.
> > > > > > > >
> > > > > > > > The beauty of supporting multiple timestamps is that a user
> > does
> > > > not
> > > > > > > > have to care at all about timestamps (or timestamp functions)
> > and
> > > > > > > > watermarks. As long as the query uses a timestamp attribute
> > that
> > > > was
> > > > > > > > originally declared as rowtime in a source table (and was not
> > > > > modified
> > > > > > > > afterwards), this is fine. Think of a cascade of three
> windowed
> > > > > joins:
> > > > > > > > R - S - T - U, and you want to join S - T first. In that
> case,
> > > you
> > > > > > > > need to preserve the timestamps of S and T in order to join R
> > and
> > > > U.
> > > > > > > > From a relational algebra point of view, there is no reason
> to
> > > > have a
> > > > > > > > limitation on how these attributes are accessed. Timestamps
> are
> > > > just
> > > > > > > > regular fields of a record. The only restriction in the
> context
> > > of
> > > > > > > > stream processing is that the watermark must be aligned with
> > > > > > > > timestamps, i.e., follow all timestamps such that data is not
> > > late
> > > > > > > > according to any of the timestamps. This we can achieve and
> > > handle
> > > > > > > internally without the user having to worry about it.
> > > > > > > >
> > > > > > > > Xingcan:
> > > > > > > > ---
> > > > > > > > I think your questions are mostly implementation details and
> > not
> > > so
> > > > > > > > much related to the original proposal of supporting multiple
> > > > > > timestamps.
> > > > > > > >
> > > > > > > > My take on your questions is:
> > > > > > > > 1. The rate at which watermarks are emitted is not important
> > for
> > > > the
> > > > > > > > correctness of a query. However, it can affect the
> performance,
> > > > > > > > because each watermark is sent as a special record and it is
> > > > > > > > broadcasted. My initial take would be to emit a new watermark
> > > > > whenever
> > > > > > > > the operator updated its watermark because usually, the
> > operator
> > > > > would
> > > > > > > > have forwarded the old watermark.
> > > > > > > > 2. I would say this is the responsibility of the operator
> > because
> > > > > > > > first it is not related to the semantics of the query and
> > second
> > > it
> > > > > is
> > > > > > > > an operator responsibility in the existing code as well.
> > > > > > > >
> > > > > > > > Jark 2:
> > > > > > > > You are right, the query (or user) must decide on the
> > event-time
> > > > > > > > attribute to use. My main point is, it is much easier for the
> > > user
> > > > > > > > (and for us
> > > > > > > > internally) if we internally track multiple timestamps.
> Because
> > > we
> > > > do
> > > > > > > > not have to prune the timestamp that will not be later used
> > into
> > > > the
> > > > > > > join.
> > > > > > > > Moreover, both timestamps might be used later (see join
> > example,
> > > > > which
> > > > > > > > could be reordered of course). All we have to do is to ensure
> > > that
> > > > > all
> > > > > > > > timestamps are aligned with the watermarks.
> > > > > > > >
> > > > > > > > Radu 2:
> > > > > > > > IMO, time (or anything else that affects the semantics)
> should
> > > > never
> > > > > > > > be decided by the system. When we would do that, a query is
> not
> > > > fully
> > > > > > > > specified or, even worse, the way it is executed is
> > semantically
> > > > > > > > incorrect and produces arbitrary results.
> > > > > > > >
> > > > > > > > Time attributes should be specified in the source tables and
> > then
> > > > > > > > forwarded from there. So far I haven't seen an example where
> > this
> > > > > > > > would not be possible (within the semantics or relational
> > > queries).
> > > > > If
> > > > > > > > we do that right, there won't be a need for explicit time
> > > > management
> > > > > > > > except for the definition of the initial timestamps which can
> > be
> > > > > > > > hidden in the table definition. As I said before, we (or the
> > > > system)
> > > > > > > > cannot decide on the timestamp because that would lead to
> > > arbitrary
> > > > > > > > results. Asking the user to do that would mean explicit time
> > > > > > > > management which is also not desirable. I think my proposal
> > gives
> > > > > > > > users all options (timestamps) to chose from and the system
> can
> > > do
> > > > > the
> > > > > > > rest.
> > > > > > > >
> > > > > > > > Best, Fabian
> > > > > > > >
> > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <
> > radu.tudoran@huawei.com
> > > >:
> > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > I just want to add that I was referring to NULL values not
> > > > > > > > > specifically
> > > > > > > > to
> > > > > > > > > timefields but to the event itself. If you have the follow
> > > > > situation
> > > > > > > > >
> > > > > > > > > Stream 1:     .... |    event1   | ....
> > > > > > > > > Stream 2:     .... |             | ....
> > > > > > > > >
> > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no
> > > > > > > > > condition)...then you still need to emit (event1,null) ...
> as
> > > > this
> > > > > > > > > is the behavior of left join. This is maybe a very simple
> > > > > situation,
> > > > > > > > > but the
> > > > > > > > point
> > > > > > > > > is that left joins and right joins can have situation when
> > you
> > > > have
> > > > > > > > > elements only in the main stream and no element in the
> right
> > > > > stream.
> > > > > > > > > And for this case you still need to emit.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regarding whether time should be decided by system or
> not...i
> > > > think
> > > > > > > > > the answer is it depends. I think the example from Jack is
> > very
> > > > > good
> > > > > > > > > and
> > > > > > > > shows
> > > > > > > > > the need for some mechanisms to select/manage the time (I
> > like
> > > > the
> > > > > > > > proposal
> > > > > > > > > of having functions to insert the time in the output!).
> > > However,
> > > > if
> > > > > > > > > a business analyst would write a query without explicit
> time
> > > > > > > > > management we still need to have some default behavior in
> the
> > > > > > > > > system. As per my initial proposal, I think  we need to
> > decide
> > > on
> > > > > > > > > one timestamp field to carry (either a new one at the
> moment
> > of
> > > > the
> > > > > > > > > join) or the timestamp from the
> > > > > > > > main
> > > > > > > > > stream  (...although I am not sure which one is the main
> > stream
> > > > in
> > > > > > > > > the
> > > > > > > > case
> > > > > > > > > of a full join:) )
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Dr. Radu Tudoran
> > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > German Research Center
> > > > > > > > > Munich Office
> > > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > > >
> > > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > > Mobile: +49 15209084330
> > > > > > > > > Telephone: +49 891588344173
> > > > > > > > >
> > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
> HRB
> > > > > 56063,
> > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> > HRB
> > > > > 56063,
> > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > This e-mail and its attachments contain confidential
> > > information
> > > > > from
> > > > > > > > > HUAWEI, which is intended only for the person or entity
> whose
> > > > > address
> > > > > > > is
> > > > > > > > > listed above. Any use of the information contained herein
> in
> > > any
> > > > > way
> > > > > > > > > (including, but not limited to, total or partial
> disclosure,
> > > > > > > > reproduction,
> > > > > > > > > or dissemination) by persons other than the intended
> > > recipient(s)
> > > > > is
> > > > > > > > > prohibited. If you receive this e-mail in error, please
> > notify
> > > > the
> > > > > > > sender
> > > > > > > > > by phone or email immediately and delete it!
> > > > > > > > >
> > > > > > > > > -----Original Message-----
> > > > > > > > > From: Jark Wu [mailto:jark@apache.org]
> > > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
> > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> > > > handling
> > > > > > > > >
> > > > > > > > > Hi Xingcan,
> > > > > > > > >
> > > > > > > > > IMO, I don't think event-time of join results could be
> > > > > automatically
> > > > > > > > > decided by system. Considering batch tables, if users want
> a
> > > > event
> > > > > > time
> > > > > > > > > window aggregation after join, user must specify the time
> > field
> > > > > > > > explicitly
> > > > > > > > > (T1.rowtime or T2.rowtime or the computed result of them).
> So
> > > in
> > > > > the
> > > > > > > case
> > > > > > > > > of streaming tables, the system also can't automatically
> > decide
> > > > the
> > > > > > > time
> > > > > > > > > field for users.
> > > > > > > > >
> > > > > > > > > In regards to the question you asked, I think we don't need
> > to
> > > > > change
> > > > > > > the
> > > > > > > > > watermark no matter we choose the left rowtime or right
> > rowtime
> > > > or
> > > > > > the
> > > > > > > > > combination. Because the watermark has been aligned with
> the
> > > > > rowtime
> > > > > > in
> > > > > > > > the
> > > > > > > > > source. Maybe I'm wrong about this, please correct me if
> I'm
> > > > > missing
> > > > > > > > > something.
> > > > > > > > >
> > > > > > > > > What do you think?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Jark
> > > > > > > > >
> > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingcanc@gmail.com
> >:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > @Fabian, thanks for raising this.
> > > > > > > > > >
> > > > > > > > > > @Radu and Jark, personally I think the timestamp field is
> > > > > critical
> > > > > > > for
> > > > > > > > > > query processing and thus should be declared as (or
> > supposed
> > > to
> > > > > be)
> > > > > > > > > > NOT NULL. In addition, I think the event-time semantic of
> > the
> > > > > join
> > > > > > > > > > results should be automatically decided by the system,
> > i.e.,
> > > we
> > > > > do
> > > > > > > not
> > > > > > > > > > hand it over to users so to avoid some unpredictable
> > > > assignment.
> > > > > > > > > >
> > > > > > > > > > Generally speaking, consolidating different time fields
> is
> > > > > possible
> > > > > > > > > > since all of them should ideally be monotonically
> > increasing.
> > > > > From
> > > > > > my
> > > > > > > > > > point of view, the problem lies in
> > > > > > > > > > (1) what's the relationship between the old and new
> > > watermarks.
> > > > > > Shall
> > > > > > > > > > they be one-to-one mapping or the new watermarks could
> skip
> > > > some
> > > > > > > > > > timestamps? And (2) who is in charge of emitting the
> > blocked
> > > > > > > > > > watermarks, the operator or the process function?
> > > > > > > > > >
> > > > > > > > > > I'd like to hear from you.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xingcan
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <
> jark@apache.org
> > >
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > Radu's concerns make sense to me, especially the null
> > value
> > > > > > > > > > > timestamp and multi-proctime.
> > > > > > > > > > >
> > > > > > > > > > > I have also something in my mind. I would like to
> propose
> > > > some
> > > > > > time
> > > > > > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp
> ts)
> > > > will
> > > > > > > > > > > generate a event time logical attribute, PROC_TIME()
> will
> > > > > > generate
> > > > > > > a
> > > > > > > > > > > processing time logical attribute. It is similar to
> > > > > > TUMBLE_ROWTIME
> > > > > > > > > > > proposed in this PR https://github.com/apache/
> > > > flink/pull/4199.
> > > > > > > These
> > > > > > > > > > > can be used in any queries, but there still can't be
> more
> > > > than
> > > > > > one
> > > > > > > > > > > rowtime attribute or more than one proctime attribute
> in
> > a
> > > > > table
> > > > > > > > > schema.
> > > > > > > > > > >
> > > > > > > > > > > The both selected timestamp fields from a JOIN query
> will
> > > be
> > > > > > > > > > materialized.
> > > > > > > > > > > If someone needs further down the computation based on
> > the
> > > > > event
> > > > > > > > > > > time,
> > > > > > > > > > they
> > > > > > > > > > > need to create a new time attribute using the
> > ROW_TIME(...)
> > > > > > > > > > > function. And this can also solve the null timestamp
> > > problem
> > > > in
> > > > > > > LEFT
> > > > > > > > > > > JOIN, because we
> > > > > > > > > > can
> > > > > > > > > > > use a user defined function to combine the two rowtimes
> > and
> > > > > make
> > > > > > > the
> > > > > > > > > > result
> > > > > > > > > > > as the event time attribute, e.g. SELECT
> > > > > ROW_TIME(udf(T1.rowtime,
> > > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
> > > > > radu.tudoran@huawei.com
> > > > > > >:
> > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > I think this is an interesting discussion and I would
> > > like
> > > > to
> > > > > > add
> > > > > > > > > > > > some issues and give some feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > - For supporting the join we do not only need to
> think
> > of
> > > > the
> > > > > > > time
> > > > > > > > > > > > but also on the null values. For example if you have
> a
> > > LEFT
> > > > > (or
> > > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and the
> > > > > secondary
> > > > > > > > > > > > input is not
> > > > > > > > > > > available
> > > > > > > > > > > > you should still emit Row.of(event1, null)...as far
> as
> > I
> > > > know
> > > > > > if
> > > > > > > > > > > > you
> > > > > > > > > > need
> > > > > > > > > > > > to serialize/deserialize null values to send them
> they
> > do
> > > > not
> > > > > > > > > > > > work. So
> > > > > > > > > > we
> > > > > > > > > > > > should include this scenario in the discussions -If
> we
> > > will
> > > > > > have
> > > > > > > > > > > > multiple timestamp in an (output) event, one question
> > > > > > > > > > is
> > > > > > > > > > > > how to select afterwards which is the primary time
> > field
> > > on
> > > > > > which
> > > > > > > > > > > > to operate. When we describe a query we might be able
> > to
> > > > > > specify
> > > > > > > > > > > > (or we
> > > > > > > > > > get
> > > > > > > > > > > > this implicitly if we implement the carryon of the 2
> > > > > > timestamps)
> > > > > > > > > > Select
> > > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a
> query
> > is
> > > > the
> > > > > > > > > > > > input of
> > > > > > > > > > a
> > > > > > > > > > > > new processing pipeline, then, do we support
> generally
> > > also
> > > > > > that
> > > > > > > > > > > > the
> > > > > > > > > > > input
> > > > > > > > > > > > has 2 time fields? ...how do we deal with the 2 input
> > > > fields
> > > > > > > > > > > > (maybe I
> > > > > > > > > > am
> > > > > > > > > > > > missing something) further in the datastream pipeline
> > > that
> > > > we
> > > > > > > > > > > > build
> > > > > > > > > > based
> > > > > > > > > > > > on the output?
> > > > > > > > > > > > - For the case of proctime - do we need to carry 2
> > > > proctimes
> > > > > > (the
> > > > > > > > > > > > proctimes of the incoming events from each stream),
> or
> > 1
> > > > > > proctime
> > > > > > > > > > > > (as
> > > > > > > > > > we
> > > > > > > > > > > > operate on proctime and the combination of the 2
> inputs
> > > can
> > > > > be
> > > > > > > > > > considered
> > > > > > > > > > > > as a new event, the current proctime on the machine
> can
> > > be
> > > > > > > > > > > > considered
> > > > > > > > > > the
> > > > > > > > > > > > (proc)time reference for output event) or 3 proctimes
> > > (the
> > > > 2
> > > > > > > > > > > > proctimes
> > > > > > > > > > of
> > > > > > > > > > > > the input plus the proctime when the new event was
> > > > created)?
> > > > > > > > > > > > -Similar with the point above, for even time (which I
> > am
> > > > > > > > > > > > understanding
> > > > > > > > > > as
> > > > > > > > > > > > the time when the event was created...or do we
> > understand
> > > > > them
> > > > > > as
> > > > > > > > > > > > a
> > > > > > > > > > time
> > > > > > > > > > > > carry within the event?) - when we join 2 events and
> > > output
> > > > > an
> > > > > > > > > > > > event
> > > > > > > > > > that
> > > > > > > > > > > > is the result of the join - isn't this a new event
> > detach
> > > > > from
> > > > > > > the
> > > > > > > > > > > > source\input events? ... I would tend to say it is a
> > new
> > > > > event
> > > > > > > and
> > > > > > > > > > > > then
> > > > > > > > > > > as
> > > > > > > > > > > > for proctime the event time of the new event is the
> > > current
> > > > > > time
> > > > > > > > > > > > when
> > > > > > > > > > > this
> > > > > > > > > > > > output event was created. If we would accept this
> > > > hypothesis
> > > > > > then
> > > > > > > > > > > > we
> > > > > > > > > > > would
> > > > > > > > > > > > not need the 2 time input fields to be
> carried/managed
> > > > > > > implicitly.
> > > > > > > > > > > > If someone needs further down the computation
> pipeline,
> > > > then
> > > > > in
> > > > > > > > > > > > the query
> > > > > > > > > > > they
> > > > > > > > > > > > would be selected explicitly from the input stream
> and
> > > > > > projected
> > > > > > > > > > > > in
> > > > > > > > > > some
> > > > > > > > > > > > fields to be carried (Select T1.rowtime as
> FormerTime1,
> > > > > > > T2.rowtime
> > > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would
> > not
> > > > > have
> > > > > > > the
> > > > > > > > > > timestamp
> > > > > > > > > > > > logic
> > > > > > > > > > > >
> > > > > > > > > > > > ..my 2 cents
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Dr. Radu Tudoran
> > > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
> > Division
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > > > German Research Center
> > > > > > > > > > > > Munich Office
> > > > > > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > > > > > >
> > > > > > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > > > > > Mobile: +49 15209084330
> > > > > > > > > > > > Telephone: +49 891588344173
> > > > > > > > > > > >
> > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> > > www.huawei.com
> > > > > > > > > > > > Registered Office: Düsseldorf, Register Court
> > Düsseldorf,
> > > > HRB
> > > > > > > > 56063,
> > > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> > > Düsseldorf,
> > > > > HRB
> > > > > > > > 56063,
> > > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > > > > This e-mail and its attachments contain confidential
> > > > > > information
> > > > > > > > from
> > > > > > > > > > > > HUAWEI, which is intended only for the person or
> entity
> > > > whose
> > > > > > > > address
> > > > > > > > > > is
> > > > > > > > > > > > listed above. Any use of the information contained
> > herein
> > > > in
> > > > > > any
> > > > > > > > way
> > > > > > > > > > > > (including, but not limited to, total or partial
> > > > disclosure,
> > > > > > > > > > > reproduction,
> > > > > > > > > > > > or dissemination) by persons other than the intended
> > > > > > recipient(s)
> > > > > > > > is
> > > > > > > > > > > > prohibited. If you receive this e-mail in error,
> please
> > > > > notify
> > > > > > > the
> > > > > > > > > > sender
> > > > > > > > > > > > by phone or email immediately and delete it!
> > > > > > > > > > > >
> > > > > > > > > > > > -----Original Message-----
> > > > > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
> > > > > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp
> > > > > handling
> > > > > > > > > > > >
> > > > > > > > > > > > Hi everybody,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to propose and discuss some changes in the
> way
> > > how
> > > > > the
> > > > > > > > Table
> > > > > > > > > > API
> > > > > > > > > > > > / SQL internally handles timestamps.
> > > > > > > > > > > >
> > > > > > > > > > > > The Table API is implemented on top of the DataStream
> > > API.
> > > > > The
> > > > > > > > > > DataStream
> > > > > > > > > > > > API hides timestamps from users in order to ensure
> that
> > > > > > > timestamps
> > > > > > > > > and
> > > > > > > > > > > > watermarks are aligned. Instead users assign
> timestamps
> > > and
> > > > > > > > > watermarks
> > > > > > > > > > > once
> > > > > > > > > > > > (usually at the source or in a subsequent operator)
> and
> > > let
> > > > > the
> > > > > > > > > system
> > > > > > > > > > > > handle the timestamps from there on. Timestamps are
> > > stored
> > > > in
> > > > > > the
> > > > > > > > > > > timestamp
> > > > > > > > > > > > field of the StreamRecord which is a holder for the
> > user
> > > > > record
> > > > > > > and
> > > > > > > > > the
> > > > > > > > > > > > timestamp. DataStream operators that depend on time
> > > > > > > (time-windows,
> > > > > > > > > > > process
> > > > > > > > > > > > function, ...) access the timestamp from the
> > > StreamRecord.
> > > > > > > > > > > >
> > > > > > > > > > > > In contrast to the DataSteam API, the Table API and
> SQL
> > > are
> > > > > > aware
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > semantics of a query. I.e., we can analyze how users
> > > access
> > > > > > > > > timestamps
> > > > > > > > > > > and
> > > > > > > > > > > > whether they are modified or not. Another difference
> is
> > > > that
> > > > > > the
> > > > > > > > > > > timestamp
> > > > > > > > > > > > must be part of the schema of a table in order to
> have
> > > > > correct
> > > > > > > > query
> > > > > > > > > > > > semantics.
> > > > > > > > > > > >
> > > > > > > > > > > > The current design to handle timestamps is as
> follows.
> > > The
> > > > > > Table
> > > > > > > > API
> > > > > > > > > > > > stores timestamps in the timestamp field of the
> > > > StreamRecord.
> > > > > > > > > > Therefore,
> > > > > > > > > > > > timestamps are detached from the remaining data which
> > is
> > > > > stored
> > > > > > > in
> > > > > > > > > Row
> > > > > > > > > > > > objects. Hence, the physical representation of a row
> is
> > > > > > different
> > > > > > > > > from
> > > > > > > > > > > its
> > > > > > > > > > > > logical representation. We introduced a translation
> > layer
> > > > > > > > (RowSchema)
> > > > > > > > > > to
> > > > > > > > > > > > convert logical schema into physical schema. This is
> > > > > necessery
> > > > > > > for
> > > > > > > > > > > > serialization or code generation when the logical
> plan
> > is
> > > > > > > > translated
> > > > > > > > > > > into a
> > > > > > > > > > > > physical execution plan. Processing-time timestamps
> are
> > > > > > similarly
> > > > > > > > > > > handled.
> > > > > > > > > > > > They are not included in the physical schema and
> looked
> > > up
> > > > > when
> > > > > > > > > needed.
> > > > > > > > > > > > This design also requires that we need to materialize
> > > > > > timestamps
> > > > > > > > when
> > > > > > > > > > > they
> > > > > > > > > > > > are accessed by expressions. Timestamp
> materialization
> > is
> > > > > done
> > > > > > > as a
> > > > > > > > > > > > pre-optimization step.
> > > > > > > > > > > >
> > > > > > > > > > > > While thinking about the implementation of the
> > event-time
> > > > > > > windowed
> > > > > > > > > > > > stream-stream join [1] I stumbled over the question
> > which
> > > > > > > timestamp
> > > > > > > > > of
> > > > > > > > > > > both
> > > > > > > > > > > > input tables to forward. With the current design, we
> > > could
> > > > > only
> > > > > > > > have
> > > > > > > > > a
> > > > > > > > > > > > single timestamp, so keeping both timestamps would
> not
> > be
> > > > > > > possible.
> > > > > > > > > The
> > > > > > > > > > > > choice of the timestamp would need to be specified by
> > the
> > > > > query
> > > > > > > > > > otherwise
> > > > > > > > > > > > it would lack clear semantics. When executing the
> join,
> > > the
> > > > > > join
> > > > > > > > > > operator
> > > > > > > > > > > > would need to make sure that no late data is emitted.
> > > This
> > > > > > would
> > > > > > > > only
> > > > > > > > > > > work
> > > > > > > > > > > > the operator was able to hold back watermarks [2].
> > > > > > > > > > > >
> > > > > > > > > > > > With this information in mind, I'd like to discuss
> the
> > > > > > following
> > > > > > > > > > > proposal:
> > > > > > > > > > > >
> > > > > > > > > > > > - We allow more than one event-time timestamp and
> store
> > > > them
> > > > > > > > directly
> > > > > > > > > > in
> > > > > > > > > > > > the Row
> > > > > > > > > > > > - The query operators ensure that the watermarks are
> > > always
> > > > > > > behind
> > > > > > > > > all
> > > > > > > > > > > > event-time timestamps. With additional analysis we
> will
> > > be
> > > > > able
> > > > > > > to
> > > > > > > > > > > restrict
> > > > > > > > > > > > this to timestamps that are actually used as such.
> > > > > > > > > > > > - When a DataStream operator is time-based (e.g., a
> > > > > DataStream
> > > > > > > > > > > > time-windows), we inject an operator that copies the
> > > > > timestamp
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > Row
> > > > > > > > > > > > into the StreamRecord.
> > > > > > > > > > > > - We try to remove the distinction between logical
> and
> > > > > physical
> > > > > > > > > schema.
> > > > > > > > > > > > For event-time timestamps this is because we store
> them
> > > in
> > > > > the
> > > > > > > Row
> > > > > > > > > > > object,
> > > > > > > > > > > > for processing-time timestamps, we add a dummy byte
> > > field.
> > > > > When
> > > > > > > > > > > accessing a
> > > > > > > > > > > > field of this type, the code generator injects the
> code
> > > to
> > > > > > fetch
> > > > > > > > the
> > > > > > > > > > > > timestamps.
> > > > > > > > > > > > - We might be able to get around the pre-optimization
> > > time
> > > > > > > > > > > materialization
> > > > > > > > > > > > step.
> > > > > > > > > > > > - A join result would be able to keep both
> timestamps.
> > > The
> > > > > > > > watermark
> > > > > > > > > > > would
> > > > > > > > > > > > be hold back for both so both could be used in
> > subsequent
> > > > > > > > operations.
> > > > > > > > > > > >
> > > > > > > > > > > > I admit, I haven't thought this completely through.
> > > > > > > > > > > > However, the benefits of this design from my point of
> > > view
> > > > > are:
> > > > > > > > > > > > - encoding of timestamps in Rows means that the
> logical
> > > > > schema
> > > > > > is
> > > > > > > > > equal
> > > > > > > > > > > to
> > > > > > > > > > > > the physical schema
> > > > > > > > > > > > - no timestamp materialization
> > > > > > > > > > > > - support for multiple timestamps. Otherwise we would
> > > need
> > > > to
> > > > > > > > expose
> > > > > > > > > > > > internal restrictions to the user which are hard to
> > > > explain /
> > > > > > > > > > > communicate.
> > > > > > > > > > > > - no need to change any public interfaces at the
> > moment.
> > > > > > > > > > > >
> > > > > > > > > > > > The drawbacks as far as I see them are:
> > > > > > > > > > > > - additional payload due to unused timestamp field +
> > > > possibly
> > > > > > the
> > > > > > > > > > > > processing-time dummy field
> > > > > > > > > > > > - complete rework of the internal timestamp logic
> > > > (again...)
> > > > > > > > > > > >
> > > > > > > > > > > > Please let me know what you think,
> > > > > > > > > > > > Fabian
> > > > > > > > > > > >
> > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233
> > > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Table API / SQL internal timestamp handling

Posted by Fabian Hueske <fh...@gmail.com>.
Hi everybody,

I created FLINK-7337 and will work on this.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7337

2017-08-01 11:36 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi,
>
> I think the implementation of the join operator should not depend on the
> synchronization of the watermarks.
> If we need to buffer a stream because we have to wait for future records
> from the other stream to join them, then that's how it is. We cannot change
> the semantics of the query.
> It might be possible to apply backpressure to the source to delay one
> stream, but this is a very delicate thing to do and can easily lead to
> deadlocks and can have unpredictable side effects. So I would just consume
> the stream and put it into the state.
>
> From my point of view, timestamps cannot be "irreconcilable" because the
> query semantics and input streams are given.
> A join will always need to buffer some records. It is true that the join
> operator might need to buffer a lot of records if the input streams and
> join predicate are not aligned.
> IMO, This is fine as long as the query has enough resources. Once it runs
> out of resources, it simply fails and can be restarted with more resources
> or adapted.
>
> Cheers, Fabian
>
>
> 2017-08-01 6:04 GMT+02:00 Xingcan Cui <xi...@gmail.com>:
>
>> Hi Shaoxuan,
>>
>> I really appreciate your prompt reply. What you explained makes sense to
>> me.
>>
>> There is only one point that I got some different ideas about "we have to
>> buffer
>> all the delta data between watermarks of two inputs".
>>
>> Consider the following SQL on joining two streams l and r:
>>
>> SELECT * FROM l, r
>> WHERE l.name = r.name
>> AND l.ts BETWEEN r.ts - INTERVAL '600' MINUTE
>>     AND r.ts - INTERVAL '599' MINUTE;
>>
>> This query is valid since it holds both an equi-key and a time span
>> restriction.
>>
>> There are two different situations to execute the query: (1) if the
>> timestamps of
>> l and r are synchronized, e.g., they both contain new generated events, we
>> must
>> buffer the l stream for 600 minutes; and (2) if there exists a natural
>> offset of the two
>> streams, e.g., the r stream is new generated while the l stream is sourced
>> from
>> a event queue generated 10 hours ago, it is unnecessary to buffer so much
>> data.
>>
>> That raises the question. What if the timestamps of the two streams are
>> essentially
>> “irreconcilable"?
>>
>> Best,
>> Xingcan
>>
>> On Mon, Jul 31, 2017 at 10:42 PM, Shaoxuan Wang <ws...@gmail.com>
>> wrote:
>>
>> > Xingcan,
>> > Watermark is the “estimate of completion”. User defines the waterMark
>> based
>> > on the best estimation per each input of when it pretty much sees all
>> the
>> > data. It is usually calculated by the event timestamp.
>> > When we do a windowed join, we have to make sure the watermark for both
>> > inputs are received before emit a window result at this watermark. If
>> the
>> > two inputs have large difference, say "one for today and the other one
>> > for yesterday" as you pointed out, the watermark for the windowed join
>> > operator is just yesterday.  I guess this is what Fabian means "In case
>> of
>> > a join, the smallest future timestamp depends on two fields and not
>> just on
>> > one." In the windowed join cases, we have to buffer all the delta data
>> > between watermarks of two inputs. It is the user's responsibility (if
>> > she/he wants to reduce the cost) to align watermarks of the stream
>> sources
>> > as much as possible.
>> >
>> > Regards,
>> > Shaoxuan
>> >
>> >
>> > On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xi...@gmail.com>
>> wrote:
>> >
>> > > Hi Fabian,
>> > >
>> > > I got a similar question with Jark. Theoretically, the row times of
>> two
>> > > streams
>> > > could be quite difference, e.g., one for today and the other one for
>> > > yesterday.
>> > > How can we align them?
>> > >
>> > > Best,
>> > > Xingcan
>> > >
>> > > On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fh...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jark,
>> > > >
>> > > > yes, the handling of watermarks is very tricky. It is not directly
>> > > related
>> > > > to the proposal which is only about the representation of timestamps
>> > but
>> > > > becomes important for event-time joins.
>> > > > We have a JIRA about an operator that is able to hold back
>> watermarks
>> > > [1].
>> > > >
>> > > > Roughly the idea is to track the smallest timestamp that will be
>> > emitted
>> > > in
>> > > > the future and align the watermark to this timestamp.
>> > > > For this we need to know the semantics of the operator (which
>> timestamp
>> > > > will be emitted in the future) but this will be given for relational
>> > > > operators.
>> > > > The new operator could emit a watermark whenever it received one.
>> > > >
>> > > > In case of a join, the smallest future timestamp depends on two
>> fields
>> > > and
>> > > > not just on one.
>> > > >
>> > > > Best,
>> > > > Fabian
>> > > >
>> > > > [1] https://issues.apache.org/jira/browse/FLINK-7245
>> > > >
>> > > >
>> > > > 2017-07-31 14:35 GMT+02:00 Jark Wu <ja...@apache.org>:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > @Fabian, I read your proposal carefully again, and I'm big +1 to
>> do
>> > it.
>> > > > The
>> > > > > proposal can address the problem of that how to forward both input
>> > > > tables'
>> > > > > rowtime of dual stream join (windowed/non-windowed). The
>> additional
>> > > > > payload drawback
>> > > > > is acceptable.
>> > > > >
>> > > > > You mentioned that:
>> > > > >
>> > > > > > The query operators ensure that the watermarks are always behind
>> > all
>> > > > > > event-time timestamps. With additional analysis we will be able
>> to
>> > > > > restrict
>> > > > > > this to timestamps that are actually used as such.
>> > > > >
>> > > > > I'm more curious about how can we define the watermark strategies
>> in
>> > > > order
>> > > > > to make sure all timestamp columns are aligned to watermarks.
>> > > Especially,
>> > > > > when the watermark has been defined in the input DataStream.
>> > > > >
>> > > > > Bests,
>> > > > > Jark Wu
>> > > > >
>> > > > >
>> > > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xi...@gmail.com>:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > Thanks for the answers, @Fabian.
>> > > > > >
>> > > > > > @Jark, at first I also wanted the users to reassign the
>> timestamp
>> > > field
>> > > > > > arbitrarily. However, that means we have to break the current
>> "time
>> > > > > system"
>> > > > > > and create a new one. The blocked watermarks become meaningless
>> and
>> > > > > maybe a
>> > > > > > new WatermarkAssigner should be provided. A little more strict
>> > > > mechanism
>> > > > > > would be only allowing to use the existing timestamp fields. It
>> > > sounds
>> > > > > > reasonable, but will bring an unnecessary barrier to
>> stream/batch
>> > > SQL,
>> > > > > i.e.
>> > > > > > some SQL works for the batch can not be executed in the stream
>> > > > > environment.
>> > > > > > I just wonder if we could automatically choose a field, which
>> will
>> > be
>> > > > > used
>> > > > > > in the following calculations. Not sure if it makes sense.
>> > > > > >
>> > > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main
>> > > block
>> > > > > for
>> > > > > > consolidating stream/batch SQL. Though from a general point of
>> > view,
>> > > it
>> > > > > can
>> > > > > > indicate the time to some extent, the randomness property
>> > determines
>> > > > that
>> > > > > > it should never be used in time-sensitive applications. I always
>> > > > believe
>> > > > > in
>> > > > > > that all the information used for query evaluation should be
>> > acquired
>> > > > > from
>> > > > > > data itself.
>> > > > > >
>> > > > > > Best,
>> > > > > > Xingcan
>> > > > > >
>> > > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <
>> fhueske@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Shaoxuan,
>> > > > > > >
>> > > > > > > thanks for your comments. I agree with your comment:
>> > > > > > >
>> > > > > > > > The problem we used to have is that we have treated
>> eventtime
>> > > > column
>> > > > > > as a
>> > > > > > > special timestamp column.
>> > > > > > >
>> > > > > > > IMO, an event-time timestamp column is a regular column that
>> is
>> > > > aligned
>> > > > > > > with the watermarks of the stream.
>> > > > > > > In order to distinguish watermark aligned columns from
>> others, we
>> > > > need
>> > > > > a
>> > > > > > > special flag in the schema.
>> > > > > > > When a timestamp column is modified and we cannot guarantee
>> that
>> > is
>> > > > it
>> > > > > > > still aligned with the watermarks, it must lose the special
>> flag
>> > > and
>> > > > be
>> > > > > > > treated like any other column.
>> > > > > > >
>> > > > > > > Regarding your comments:
>> > > > > > > 1) I agree, that we can use Long in addition to Timestamp as a
>> > > > > timestamp
>> > > > > > > columns. Since timestamp columns need to be comparable to
>> > > watermarks
>> > > > > > which
>> > > > > > > are Longs, I don't see that other types would make sense. For
>> > now,
>> > > I
>> > > > > > would
>> > > > > > > keep the restriction that timestamps can only be of Timestamp
>> > > type. I
>> > > > > > > think, extending this to Long would be a follow-up issue to
>> the
>> > > > > changes I
>> > > > > > > proposed here.
>> > > > > > > 2) Relates to 1) and I agree. if we use a Long attribute as
>> > > timestamp
>> > > > > it
>> > > > > > > should remain of type Long. For now I would keep converting
>> it to
>> > > > > > Timestamp
>> > > > > > > and change that later.
>> > > > > > > 3) Yes, timestamp columns must be aligned to watermarks.
>> That's
>> > > their
>> > > > > > > primary characteristic. How to define watermark strategies is
>> > > > > orthogonal
>> > > > > > to
>> > > > > > > this discussion, IMO.
>> > > > > > > 4) From my point of view, proc-time is a purely virtual column
>> > and
>> > > > not
>> > > > > > > related to an actual (data) column. However, it must be part
>> of
>> > the
>> > > > > > schema
>> > > > > > > and treated like any other attribute for a good user
>> experience
>> > and
>> > > > SQL
>> > > > > > > compliance. In order to be able to join two tables on
>> processing
>> > > > time,
>> > > > > it
>> > > > > > > must be possible to include a processing time column in the
>> > schema
>> > > > > > > definition of the table. Processing time queries can never
>> > compute
>> > > > the
>> > > > > > same
>> > > > > > > results as batch queries but their semantics should be aligned
>> > with
>> > > > > > > event-time queries.
>> > > > > > >
>> > > > > > > Best, Fabian
>> > > > > > >
>> > > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <
>> radu.tudoran@huawei.com
>> > >:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > @Shaoxuan - thanks for the  remarks. I have a question
>> > regarding
>> > > > your
>> > > > > > > > suggestion not to consider to create proctime window in a
>> > regular
>> > > > > > > column. I
>> > > > > > > > think this would be useful though. First you might need to
>> > carry
>> > > > the
>> > > > > > > > timestamp indicator of when the processing happened (for log
>> > > > > purposes,
>> > > > > > > > provenance, traceability ...). Secondly - I do not think it
>> is
>> > > > > > > > contradicting with the semantics in batch SQL as in SQL you
>> > have
>> > > > the
>> > > > > > > > function "now()" ...which pretty much carry the same
>> semantics
>> > as
>> > > > > > having
>> > > > > > > a
>> > > > > > > > function to mark the proctime and then projecting this into
>> a
>> > > > column.
>> > > > > > If
>> > > > > > > I
>> > > > > > > > am not mistaken you can introduce in database columns the
>> > result
>> > > of
>> > > > > > > calling
>> > > > > > > > now().
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Dr. Radu Tudoran
>> > > > > > > > Staff Research Engineer - Big Data Expert
>> > > > > > > > IT R&D Division
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > German Research Center
>> > > > > > > > Munich Office
>> > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > >
>> > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > Mobile: +49 15209084330
>> > > > > > > > Telephone: +49 891588344173
>> > > > > > > >
>> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
>> HRB
>> > > > 56063,
>> > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>> HRB
>> > > > 56063,
>> > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > This e-mail and its attachments contain confidential
>> > information
>> > > > from
>> > > > > > > > HUAWEI, which is intended only for the person or entity
>> whose
>> > > > address
>> > > > > > is
>> > > > > > > > listed above. Any use of the information contained herein in
>> > any
>> > > > way
>> > > > > > > > (including, but not limited to, total or partial disclosure,
>> > > > > > > reproduction,
>> > > > > > > > or dissemination) by persons other than the intended
>> > recipient(s)
>> > > > is
>> > > > > > > > prohibited. If you receive this e-mail in error, please
>> notify
>> > > the
>> > > > > > sender
>> > > > > > > > by phone or email immediately and delete it!
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > -----Original Message-----
>> > > > > > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
>> > > > > > > > Sent: Thursday, July 27, 2017 6:00 AM
>> > > > > > > > To: Dev
>> > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
>> > > handling
>> > > > > > > >
>> > > > > > > >  Hi Everyone,
>> > > > > > > > I like this proposal. The problem we used to have is that we
>> > have
>> > > > > > treated
>> > > > > > > > eventtime column as a special timestamp column. An eventtime
>> > > column
>> > > > > is
>> > > > > > > > nothing special than all other regular columns, but with a
>> > > certain
>> > > > > flag
>> > > > > > > > (eventtime-indicator) inferring that this column can be
>> used as
>> > > an
>> > > > > > > eventime
>> > > > > > > > to decide when a bounded query can emit the final result by
>> > > > comparing
>> > > > > > > with
>> > > > > > > > a concern associated waterMark.
>> > > > > > > >
>> > > > > > > > I have a few comments adding on top of this (they may have
>> > > already
>> > > > > been
>> > > > > > > > addressed in the conversation — since It’s a long
>> discussion, I
>> > > may
>> > > > > > miss
>> > > > > > > > something):
>> > > > > > > >
>> > > > > > > >    1. While we remove timestamp column, we introduce
>> > > > > > eventtime-indicator
>> > > > > > > >    (we may already have this concept), it is only a flag
>> can be
>> > > > > applied
>> > > > > > > for
>> > > > > > > >    any column (note that some types may not be able to be
>> used
>> > as
>> > > > > > > eventtime
>> > > > > > > >    column), indicating if this column can be used as
>> eventtime
>> > or
>> > > > > not.
>> > > > > > > This
>> > > > > > > >    flag is useful for validation and codeGen.
>> > > > > > > >    2. A column that has been used as an eventtime, should
>> not
>> > > lose
>> > > > > its
>> > > > > > > own
>> > > > > > > >    type. We should not cast all eventime column to the
>> > timestamp
>> > > > > type.
>> > > > > > > For
>> > > > > > > >    instance, if a column is a long type, it will keep as
>> long
>> > > type
>> > > > > even
>> > > > > > > if
>> > > > > > > > a
>> > > > > > > >    window aggregate has used it as a eventtime.
>> > > > > > > >    3. Eventtime will only work well with some associated
>> > > waterMark
>> > > > > > > >    strategy. We may consider forcing user to provide a
>> > waterMark
>> > > > > logic
>> > > > > > on
>> > > > > > > >    his/her selected eventtime.
>> > > > > > > >    4. For proctime, I hope we should not introduce
>> > > > proctime-indicator
>> > > > > > for
>> > > > > > > >    regular column. Ideally we should not allow user to
>> create
>> > > > > proctime
>> > > > > > > > window
>> > > > > > > >    on regular column, as this is against the batch query
>> > > semantics.
>> > > > > > > > Therefore
>> > > > > > > >    I suggest we should always introduce a proctime timestamp
>> > > column
>> > > > > for
>> > > > > > > > users
>> > > > > > > >    to create proctime window. And unlike eventtime, proctime
>> > does
>> > > > not
>> > > > > > > need
>> > > > > > > > any
>> > > > > > > >    associated waterMark strategy, as there is no such out of
>> > > order
>> > > > > > issue
>> > > > > > > > for
>> > > > > > > >    the proctime.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Shaoxuan
>> > > > > > > >
>> > > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <
>> > > fhueske@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Thanks everybody for the replies so far.
>> > > > > > > > >
>> > > > > > > > > Let me answer your questions and reply to your thoughts:
>> > > > > > > > >
>> > > > > > > > > Radu:
>> > > > > > > > > ---
>> > > > > > > > > First of all, although my proposal is movivated by a join
>> > > > operator,
>> > > > > > > > > this discussion is about timestamp handling, not about
>> joins
>> > in
>> > > > > > > general.
>> > > > > > > > >
>> > > > > > > > > - The semantics of outer joins is to emit null and there
>> is
>> > no
>> > > > way
>> > > > > > > > > around that. This is not an issue for us. Actually, outer
>> > joins
>> > > > are
>> > > > > > > > > supported by the batch SQL / Table API. It is true that
>> outer
>> > > > joins
>> > > > > > > > > might result in null timestamps. Calcite will mark those
>> > fields
>> > > > as
>> > > > > > > > > nullable and we should check that timestamps which are
>> used
>> > in
>> > > > > > windows
>> > > > > > > > or joins are not nullable.
>> > > > > > > > > - The query has to explicitly specify which timestamp
>> > attribute
>> > > > to
>> > > > > > use.
>> > > > > > > > > Otherwise its semantics are not complete and it is
>> invalid. A
>> > > > > > > > > group-window that follows a join will reference a
>> timestamp
>> > > > > attribute
>> > > > > > > > > and this will be used. The other timestamp might be
>> projected
>> > > > out.
>> > > > > > > > > When a result with two timestamps is converted into a
>> > > DataStream,
>> > > > > the
>> > > > > > > > > user has to decide. This could be done inside of the
>> Table to
>> > > > > > > > > DataStream conversion. If the Table has more than one
>> valid
>> > > > > > timestamp,
>> > > > > > > > > the conversion will ask which timestamp to forward.
>> > > > > > > > > - A proctime join should forward all proctime attributes
>> of
>> > the
>> > > > > input
>> > > > > > > > > tables. All will be the same, but that does not matter
>> > because
>> > > > they
>> > > > > > > > > are either virtual or represented as 1 byte dummy
>> attributes.
>> > > > Also,
>> > > > > > > > > unused ones will be automatically projected out anyway.
>> > > > > > > > > - An event-time join should forward all event-time
>> attributes
>> > > of
>> > > > > the
>> > > > > > > > > input tables. Creating a new event-time attribute using
>> > > > processing
>> > > > > > > > > time makes event-time processing pointless and will give
>> > > > completely
>> > > > > > > > random results.
>> > > > > > > > > Event-time is not about the "time an event is created" but
>> > > about
>> > > > a
>> > > > > > > > > timestamp that is associated with an event. For example an
>> > > order
>> > > > > > event
>> > > > > > > > > could have three timestamps: "orderTime", "shipTime", and
>> > > > > > > "receiveTime".
>> > > > > > > > > Each could be a valid event-time attribute.
>> > > > > > > > >
>> > > > > > > > > Jark:
>> > > > > > > > > ---
>> > > > > > > > > Thanks for the proposal. I think I understand what you
>> want
>> > to
>> > > > > > achieve
>> > > > > > > > > with this, but I think functions to instantiate time
>> > attributes
>> > > > are
>> > > > > > > > > not necessary and would make things more complicated. The
>> > point
>> > > > of
>> > > > > > > > > supporting multiple time attributes is to ensure that all
>> of
>> > > them
>> > > > > are
>> > > > > > > > > aligned with the watermarks. If we add a method
>> > > > ROW_TIME(timestamp)
>> > > > > > > > > and we don't know if the timestamp is aligned with the
>> > > > watermarks.
>> > > > > If
>> > > > > > > > > that is not the case, the query won't be executed as
>> > expected.
>> > > > The
>> > > > > > > > > issue of LEFT JOIN can easily be addressed by checking for
>> > > > > > > > > nullablility during optimization when an operator tries to
>> > use
>> > > > it.
>> > > > > > > > >
>> > > > > > > > > The beauty of supporting multiple timestamps is that a
>> user
>> > > does
>> > > > > not
>> > > > > > > > > have to care at all about timestamps (or timestamp
>> functions)
>> > > and
>> > > > > > > > > watermarks. As long as the query uses a timestamp
>> attribute
>> > > that
>> > > > > was
>> > > > > > > > > originally declared as rowtime in a source table (and was
>> not
>> > > > > > modified
>> > > > > > > > > afterwards), this is fine. Think of a cascade of three
>> > windowed
>> > > > > > joins:
>> > > > > > > > > R - S - T - U, and you want to join S - T first. In that
>> > case,
>> > > > you
>> > > > > > > > > need to preserve the timestamps of S and T in order to
>> join R
>> > > and
>> > > > > U.
>> > > > > > > > > From a relational algebra point of view, there is no
>> reason
>> > to
>> > > > > have a
>> > > > > > > > > limitation on how these attributes are accessed.
>> Timestamps
>> > are
>> > > > > just
>> > > > > > > > > regular fields of a record. The only restriction in the
>> > context
>> > > > of
>> > > > > > > > > stream processing is that the watermark must be aligned
>> with
>> > > > > > > > > timestamps, i.e., follow all timestamps such that data is
>> not
>> > > > late
>> > > > > > > > > according to any of the timestamps. This we can achieve
>> and
>> > > > handle
>> > > > > > > > internally without the user having to worry about it.
>> > > > > > > > >
>> > > > > > > > > Xingcan:
>> > > > > > > > > ---
>> > > > > > > > > I think your questions are mostly implementation details
>> and
>> > > not
>> > > > so
>> > > > > > > > > much related to the original proposal of supporting
>> multiple
>> > > > > > > timestamps.
>> > > > > > > > >
>> > > > > > > > > My take on your questions is:
>> > > > > > > > > 1. The rate at which watermarks are emitted is not
>> important
>> > > for
>> > > > > the
>> > > > > > > > > correctness of a query. However, it can affect the
>> > performance,
>> > > > > > > > > because each watermark is sent as a special record and it
>> is
>> > > > > > > > > broadcasted. My initial take would be to emit a new
>> watermark
>> > > > > > whenever
>> > > > > > > > > the operator updated its watermark because usually, the
>> > > operator
>> > > > > > would
>> > > > > > > > > have forwarded the old watermark.
>> > > > > > > > > 2. I would say this is the responsibility of the operator
>> > > because
>> > > > > > > > > first it is not related to the semantics of the query and
>> > > second
>> > > > it
>> > > > > > is
>> > > > > > > > > an operator responsibility in the existing code as well.
>> > > > > > > > >
>> > > > > > > > > Jark 2:
>> > > > > > > > > You are right, the query (or user) must decide on the
>> > > event-time
>> > > > > > > > > attribute to use. My main point is, it is much easier for
>> the
>> > > > user
>> > > > > > > > > (and for us
>> > > > > > > > > internally) if we internally track multiple timestamps.
>> > Because
>> > > > we
>> > > > > do
>> > > > > > > > > not have to prune the timestamp that will not be later
>> used
>> > > into
>> > > > > the
>> > > > > > > > join.
>> > > > > > > > > Moreover, both timestamps might be used later (see join
>> > > example,
>> > > > > > which
>> > > > > > > > > could be reordered of course). All we have to do is to
>> ensure
>> > > > that
>> > > > > > all
>> > > > > > > > > timestamps are aligned with the watermarks.
>> > > > > > > > >
>> > > > > > > > > Radu 2:
>> > > > > > > > > IMO, time (or anything else that affects the semantics)
>> > should
>> > > > > never
>> > > > > > > > > be decided by the system. When we would do that, a query
>> is
>> > not
>> > > > > fully
>> > > > > > > > > specified or, even worse, the way it is executed is
>> > > semantically
>> > > > > > > > > incorrect and produces arbitrary results.
>> > > > > > > > >
>> > > > > > > > > Time attributes should be specified in the source tables
>> and
>> > > then
>> > > > > > > > > forwarded from there. So far I haven't seen an example
>> where
>> > > this
>> > > > > > > > > would not be possible (within the semantics or relational
>> > > > queries).
>> > > > > > If
>> > > > > > > > > we do that right, there won't be a need for explicit time
>> > > > > management
>> > > > > > > > > except for the definition of the initial timestamps which
>> can
>> > > be
>> > > > > > > > > hidden in the table definition. As I said before, we (or
>> the
>> > > > > system)
>> > > > > > > > > cannot decide on the timestamp because that would lead to
>> > > > arbitrary
>> > > > > > > > > results. Asking the user to do that would mean explicit
>> time
>> > > > > > > > > management which is also not desirable. I think my
>> proposal
>> > > gives
>> > > > > > > > > users all options (timestamps) to chose from and the
>> system
>> > can
>> > > > do
>> > > > > > the
>> > > > > > > > rest.
>> > > > > > > > >
>> > > > > > > > > Best, Fabian
>> > > > > > > > >
>> > > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <
>> > > radu.tudoran@huawei.com
>> > > > >:
>> > > > > > > > >
>> > > > > > > > > > Hi everyone,
>> > > > > > > > > >
>> > > > > > > > > > I just want to add that I was referring to NULL values
>> not
>> > > > > > > > > > specifically
>> > > > > > > > > to
>> > > > > > > > > > timefields but to the event itself. If you have the
>> follow
>> > > > > > situation
>> > > > > > > > > >
>> > > > > > > > > > Stream 1:     .... |    event1   | ....
>> > > > > > > > > > Stream 2:     .... |             | ....
>> > > > > > > > > >
>> > > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2
>> (no
>> > > > > > > > > > condition)...then you still need to emit (event1,null)
>> ...
>> > as
>> > > > > this
>> > > > > > > > > > is the behavior of left join. This is maybe a very
>> simple
>> > > > > > situation,
>> > > > > > > > > > but the
>> > > > > > > > > point
>> > > > > > > > > > is that left joins and right joins can have situation
>> when
>> > > you
>> > > > > have
>> > > > > > > > > > elements only in the main stream and no element in the
>> > right
>> > > > > > stream.
>> > > > > > > > > > And for this case you still need to emit.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Regarding whether time should be decided by system or
>> > not...i
>> > > > > think
>> > > > > > > > > > the answer is it depends. I think the example from Jack
>> is
>> > > very
>> > > > > > good
>> > > > > > > > > > and
>> > > > > > > > > shows
>> > > > > > > > > > the need for some mechanisms to select/manage the time
>> (I
>> > > like
>> > > > > the
>> > > > > > > > > proposal
>> > > > > > > > > > of having functions to insert the time in the output!).
>> > > > However,
>> > > > > if
>> > > > > > > > > > a business analyst would write a query without explicit
>> > time
>> > > > > > > > > > management we still need to have some default behavior
>> in
>> > the
>> > > > > > > > > > system. As per my initial proposal, I think  we need to
>> > > decide
>> > > > on
>> > > > > > > > > > one timestamp field to carry (either a new one at the
>> > moment
>> > > of
>> > > > > the
>> > > > > > > > > > join) or the timestamp from the
>> > > > > > > > > main
>> > > > > > > > > > stream  (...although I am not sure which one is the main
>> > > stream
>> > > > > in
>> > > > > > > > > > the
>> > > > > > > > > case
>> > > > > > > > > > of a full join:) )
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Dr. Radu Tudoran
>> > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
>> Division
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > German Research Center
>> > > > > > > > > > Munich Office
>> > > > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > > > >
>> > > > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > > > Mobile: +49 15209084330
>> > > > > > > > > > Telephone: +49 891588344173
>> > > > > > > > > >
>> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
>> www.huawei.com
>> > > > > > > > > > Registered Office: Düsseldorf, Register Court
>> Düsseldorf,
>> > HRB
>> > > > > > 56063,
>> > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
>> Düsseldorf,
>> > > HRB
>> > > > > > 56063,
>> > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > This e-mail and its attachments contain confidential
>> > > > information
>> > > > > > from
>> > > > > > > > > > HUAWEI, which is intended only for the person or entity
>> > whose
>> > > > > > address
>> > > > > > > > is
>> > > > > > > > > > listed above. Any use of the information contained
>> herein
>> > in
>> > > > any
>> > > > > > way
>> > > > > > > > > > (including, but not limited to, total or partial
>> > disclosure,
>> > > > > > > > > reproduction,
>> > > > > > > > > > or dissemination) by persons other than the intended
>> > > > recipient(s)
>> > > > > > is
>> > > > > > > > > > prohibited. If you receive this e-mail in error, please
>> > > notify
>> > > > > the
>> > > > > > > > sender
>> > > > > > > > > > by phone or email immediately and delete it!
>> > > > > > > > > >
>> > > > > > > > > > -----Original Message-----
>> > > > > > > > > > From: Jark Wu [mailto:jark@apache.org]
>> > > > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
>> > > > > > > > > > To: dev@flink.apache.org
>> > > > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal
>> timestamp
>> > > > > handling
>> > > > > > > > > >
>> > > > > > > > > > Hi Xingcan,
>> > > > > > > > > >
>> > > > > > > > > > IMO, I don't think event-time of join results could be
>> > > > > > automatically
>> > > > > > > > > > decided by system. Considering batch tables, if users
>> want
>> > a
>> > > > > event
>> > > > > > > time
>> > > > > > > > > > window aggregation after join, user must specify the
>> time
>> > > field
>> > > > > > > > > explicitly
>> > > > > > > > > > (T1.rowtime or T2.rowtime or the computed result of
>> them).
>> > So
>> > > > in
>> > > > > > the
>> > > > > > > > case
>> > > > > > > > > > of streaming tables, the system also can't automatically
>> > > decide
>> > > > > the
>> > > > > > > > time
>> > > > > > > > > > field for users.
>> > > > > > > > > >
>> > > > > > > > > > In regards to the question you asked, I think we don't
>> need
>> > > to
>> > > > > > change
>> > > > > > > > the
>> > > > > > > > > > watermark no matter we choose the left rowtime or right
>> > > rowtime
>> > > > > or
>> > > > > > > the
>> > > > > > > > > > combination. Because the watermark has been aligned with
>> > the
>> > > > > > rowtime
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > source. Maybe I'm wrong about this, please correct me if
>> > I'm
>> > > > > > missing
>> > > > > > > > > > something.
>> > > > > > > > > >
>> > > > > > > > > > What do you think?
>> > > > > > > > > >
>> > > > > > > > > > Regards,
>> > > > > > > > > > Jark
>> > > > > > > > > >
>> > > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <
>> xingcanc@gmail.com
>> > >:
>> > > > > > > > > >
>> > > > > > > > > > > Hi all,
>> > > > > > > > > > >
>> > > > > > > > > > > @Fabian, thanks for raising this.
>> > > > > > > > > > >
>> > > > > > > > > > > @Radu and Jark, personally I think the timestamp
>> field is
>> > > > > > critical
>> > > > > > > > for
>> > > > > > > > > > > query processing and thus should be declared as (or
>> > > supposed
>> > > > to
>> > > > > > be)
>> > > > > > > > > > > NOT NULL. In addition, I think the event-time
>> semantic of
>> > > the
>> > > > > > join
>> > > > > > > > > > > results should be automatically decided by the system,
>> > > i.e.,
>> > > > we
>> > > > > > do
>> > > > > > > > not
>> > > > > > > > > > > hand it over to users so to avoid some unpredictable
>> > > > > assignment.
>> > > > > > > > > > >
>> > > > > > > > > > > Generally speaking, consolidating different time
>> fields
>> > is
>> > > > > > possible
>> > > > > > > > > > > since all of them should ideally be monotonically
>> > > increasing.
>> > > > > > From
>> > > > > > > my
>> > > > > > > > > > > point of view, the problem lies in
>> > > > > > > > > > > (1) what's the relationship between the old and new
>> > > > watermarks.
>> > > > > > > Shall
>> > > > > > > > > > > they be one-to-one mapping or the new watermarks could
>> > skip
>> > > > > some
>> > > > > > > > > > > timestamps? And (2) who is in charge of emitting the
>> > > blocked
>> > > > > > > > > > > watermarks, the operator or the process function?
>> > > > > > > > > > >
>> > > > > > > > > > > I'd like to hear from you.
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Xingcan
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <
>> > jark@apache.org
>> > > >
>> > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Radu's concerns make sense to me, especially the
>> null
>> > > value
>> > > > > > > > > > > > timestamp and multi-proctime.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I have also something in my mind. I would like to
>> > propose
>> > > > > some
>> > > > > > > time
>> > > > > > > > > > > > indicator built-in functions, e.g.
>> ROW_TIME(Timestamp
>> > ts)
>> > > > > will
>> > > > > > > > > > > > generate a event time logical attribute, PROC_TIME()
>> > will
>> > > > > > > generate
>> > > > > > > > a
>> > > > > > > > > > > > processing time logical attribute. It is similar to
>> > > > > > > TUMBLE_ROWTIME
>> > > > > > > > > > > > proposed in this PR https://github.com/apache/
>> > > > > flink/pull/4199.
>> > > > > > > > These
>> > > > > > > > > > > > can be used in any queries, but there still can't be
>> > more
>> > > > > than
>> > > > > > > one
>> > > > > > > > > > > > rowtime attribute or more than one proctime
>> attribute
>> > in
>> > > a
>> > > > > > table
>> > > > > > > > > > schema.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The both selected timestamp fields from a JOIN query
>> > will
>> > > > be
>> > > > > > > > > > > materialized.
>> > > > > > > > > > > > If someone needs further down the computation based
>> on
>> > > the
>> > > > > > event
>> > > > > > > > > > > > time,
>> > > > > > > > > > > they
>> > > > > > > > > > > > need to create a new time attribute using the
>> > > ROW_TIME(...)
>> > > > > > > > > > > > function. And this can also solve the null timestamp
>> > > > problem
>> > > > > in
>> > > > > > > > LEFT
>> > > > > > > > > > > > JOIN, because we
>> > > > > > > > > > > can
>> > > > > > > > > > > > use a user defined function to combine the two
>> rowtimes
>> > > and
>> > > > > > make
>> > > > > > > > the
>> > > > > > > > > > > result
>> > > > > > > > > > > > as the event time attribute, e.g. SELECT
>> > > > > > ROW_TIME(udf(T1.rowtime,
>> > > > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > What do you think?
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
>> > > > > > radu.tudoran@huawei.com
>> > > > > > > >:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I think this is an interesting discussion and I
>> would
>> > > > like
>> > > > > to
>> > > > > > > add
>> > > > > > > > > > > > > some issues and give some feedback.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > - For supporting the join we do not only need to
>> > think
>> > > of
>> > > > > the
>> > > > > > > > time
>> > > > > > > > > > > > > but also on the null values. For example if you
>> have
>> > a
>> > > > LEFT
>> > > > > > (or
>> > > > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and
>> the
>> > > > > > secondary
>> > > > > > > > > > > > > input is not
>> > > > > > > > > > > > available
>> > > > > > > > > > > > > you should still emit Row.of(event1, null)...as
>> far
>> > as
>> > > I
>> > > > > know
>> > > > > > > if
>> > > > > > > > > > > > > you
>> > > > > > > > > > > need
>> > > > > > > > > > > > > to serialize/deserialize null values to send them
>> > they
>> > > do
>> > > > > not
>> > > > > > > > > > > > > work. So
>> > > > > > > > > > > we
>> > > > > > > > > > > > > should include this scenario in the discussions
>> -If
>> > we
>> > > > will
>> > > > > > > have
>> > > > > > > > > > > > > multiple timestamp in an (output) event, one
>> question
>> > > > > > > > > > > is
>> > > > > > > > > > > > > how to select afterwards which is the primary time
>> > > field
>> > > > on
>> > > > > > > which
>> > > > > > > > > > > > > to operate. When we describe a query we might be
>> able
>> > > to
>> > > > > > > specify
>> > > > > > > > > > > > > (or we
>> > > > > > > > > > > get
>> > > > > > > > > > > > > this implicitly if we implement the carryon of
>> the 2
>> > > > > > > timestamps)
>> > > > > > > > > > > Select
>> > > > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a
>> > query
>> > > is
>> > > > > the
>> > > > > > > > > > > > > input of
>> > > > > > > > > > > a
>> > > > > > > > > > > > > new processing pipeline, then, do we support
>> > generally
>> > > > also
>> > > > > > > that
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > input
>> > > > > > > > > > > > > has 2 time fields? ...how do we deal with the 2
>> input
>> > > > > fields
>> > > > > > > > > > > > > (maybe I
>> > > > > > > > > > > am
>> > > > > > > > > > > > > missing something) further in the datastream
>> pipeline
>> > > > that
>> > > > > we
>> > > > > > > > > > > > > build
>> > > > > > > > > > > based
>> > > > > > > > > > > > > on the output?
>> > > > > > > > > > > > > - For the case of proctime - do we need to carry 2
>> > > > > proctimes
>> > > > > > > (the
>> > > > > > > > > > > > > proctimes of the incoming events from each
>> stream),
>> > or
>> > > 1
>> > > > > > > proctime
>> > > > > > > > > > > > > (as
>> > > > > > > > > > > we
>> > > > > > > > > > > > > operate on proctime and the combination of the 2
>> > inputs
>> > > > can
>> > > > > > be
>> > > > > > > > > > > considered
>> > > > > > > > > > > > > as a new event, the current proctime on the
>> machine
>> > can
>> > > > be
>> > > > > > > > > > > > > considered
>> > > > > > > > > > > the
>> > > > > > > > > > > > > (proc)time reference for output event) or 3
>> proctimes
>> > > > (the
>> > > > > 2
>> > > > > > > > > > > > > proctimes
>> > > > > > > > > > > of
>> > > > > > > > > > > > > the input plus the proctime when the new event was
>> > > > > created)?
>> > > > > > > > > > > > > -Similar with the point above, for even time
>> (which I
>> > > am
>> > > > > > > > > > > > > understanding
>> > > > > > > > > > > as
>> > > > > > > > > > > > > the time when the event was created...or do we
>> > > understand
>> > > > > > them
>> > > > > > > as
>> > > > > > > > > > > > > a
>> > > > > > > > > > > time
>> > > > > > > > > > > > > carry within the event?) - when we join 2 events
>> and
>> > > > output
>> > > > > > an
>> > > > > > > > > > > > > event
>> > > > > > > > > > > that
>> > > > > > > > > > > > > is the result of the join - isn't this a new event
>> > > detach
>> > > > > > from
>> > > > > > > > the
>> > > > > > > > > > > > > source\input events? ... I would tend to say it
>> is a
>> > > new
>> > > > > > event
>> > > > > > > > and
>> > > > > > > > > > > > > then
>> > > > > > > > > > > > as
>> > > > > > > > > > > > > for proctime the event time of the new event is
>> the
>> > > > current
>> > > > > > > time
>> > > > > > > > > > > > > when
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > output event was created. If we would accept this
>> > > > > hypothesis
>> > > > > > > then
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > not need the 2 time input fields to be
>> > carried/managed
>> > > > > > > > implicitly.
>> > > > > > > > > > > > > If someone needs further down the computation
>> > pipeline,
>> > > > > then
>> > > > > > in
>> > > > > > > > > > > > > the query
>> > > > > > > > > > > > they
>> > > > > > > > > > > > > would be selected explicitly from the input stream
>> > and
>> > > > > > > projected
>> > > > > > > > > > > > > in
>> > > > > > > > > > > some
>> > > > > > > > > > > > > fields to be carried (Select T1.rowtime as
>> > FormerTime1,
>> > > > > > > > T2.rowtime
>> > > > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they
>> would
>> > > not
>> > > > > > have
>> > > > > > > > the
>> > > > > > > > > > > timestamp
>> > > > > > > > > > > > > logic
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > ..my 2 cents
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Dr. Radu Tudoran
>> > > > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
>> > > Division
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > > > > German Research Center
>> > > > > > > > > > > > > Munich Office
>> > > > > > > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > > > > > > Mobile: +49 15209084330 <+49%201520%209084330>
>> > > > > > > > > > > > > Telephone: +49 891588344173
>> <+49%2089%201588344173>
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
>> > > > www.huawei.com
>> > > > > > > > > > > > > Registered Office: Düsseldorf, Register Court
>> > > Düsseldorf,
>> > > > > HRB
>> > > > > > > > > 56063,
>> > > > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli
>> Wang
>> > > > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
>> > > > Düsseldorf,
>> > > > > > HRB
>> > > > > > > > > 56063,
>> > > > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > > > > This e-mail and its attachments contain
>> confidential
>> > > > > > > information
>> > > > > > > > > from
>> > > > > > > > > > > > > HUAWEI, which is intended only for the person or
>> > entity
>> > > > > whose
>> > > > > > > > > address
>> > > > > > > > > > > is
>> > > > > > > > > > > > > listed above. Any use of the information contained
>> > > herein
>> > > > > in
>> > > > > > > any
>> > > > > > > > > way
>> > > > > > > > > > > > > (including, but not limited to, total or partial
>> > > > > disclosure,
>> > > > > > > > > > > > reproduction,
>> > > > > > > > > > > > > or dissemination) by persons other than the
>> intended
>> > > > > > > recipient(s)
>> > > > > > > > > is
>> > > > > > > > > > > > > prohibited. If you receive this e-mail in error,
>> > please
>> > > > > > notify
>> > > > > > > > the
>> > > > > > > > > > > sender
>> > > > > > > > > > > > > by phone or email immediately and delete it!
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > -----Original Message-----
>> > > > > > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
>> > > > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
>> > > > > > > > > > > > > To: dev@flink.apache.org
>> > > > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal
>> timestamp
>> > > > > > handling
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Hi everybody,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I'd like to propose and discuss some changes in
>> the
>> > way
>> > > > how
>> > > > > > the
>> > > > > > > > > Table
>> > > > > > > > > > > API
>> > > > > > > > > > > > > / SQL internally handles timestamps.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The Table API is implemented on top of the
>> DataStream
>> > > > API.
>> > > > > > The
>> > > > > > > > > > > DataStream
>> > > > > > > > > > > > > API hides timestamps from users in order to ensure
>> > that
>> > > > > > > > timestamps
>> > > > > > > > > > and
>> > > > > > > > > > > > > watermarks are aligned. Instead users assign
>> > timestamps
>> > > > and
>> > > > > > > > > > watermarks
>> > > > > > > > > > > > once
>> > > > > > > > > > > > > (usually at the source or in a subsequent
>> operator)
>> > and
>> > > > let
>> > > > > > the
>> > > > > > > > > > system
>> > > > > > > > > > > > > handle the timestamps from there on. Timestamps
>> are
>> > > > stored
>> > > > > in
>> > > > > > > the
>> > > > > > > > > > > > timestamp
>> > > > > > > > > > > > > field of the StreamRecord which is a holder for
>> the
>> > > user
>> > > > > > record
>> > > > > > > > and
>> > > > > > > > > > the
>> > > > > > > > > > > > > timestamp. DataStream operators that depend on
>> time
>> > > > > > > > (time-windows,
>> > > > > > > > > > > > process
>> > > > > > > > > > > > > function, ...) access the timestamp from the
>> > > > StreamRecord.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > In contrast to the DataSteam API, the Table API
>> and
>> > SQL
>> > > > are
>> > > > > > > aware
>> > > > > > > > > of
>> > > > > > > > > > > the
>> > > > > > > > > > > > > semantics of a query. I.e., we can analyze how
>> users
>> > > > access
>> > > > > > > > > > timestamps
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > whether they are modified or not. Another
>> difference
>> > is
>> > > > > that
>> > > > > > > the
>> > > > > > > > > > > > timestamp
>> > > > > > > > > > > > > must be part of the schema of a table in order to
>> > have
>> > > > > > correct
>> > > > > > > > > query
>> > > > > > > > > > > > > semantics.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The current design to handle timestamps is as
>> > follows.
>> > > > The
>> > > > > > > Table
>> > > > > > > > > API
>> > > > > > > > > > > > > stores timestamps in the timestamp field of the
>> > > > > StreamRecord.
>> > > > > > > > > > > Therefore,
>> > > > > > > > > > > > > timestamps are detached from the remaining data
>> which
>> > > is
>> > > > > > stored
>> > > > > > > > in
>> > > > > > > > > > Row
>> > > > > > > > > > > > > objects. Hence, the physical representation of a
>> row
>> > is
>> > > > > > > different
>> > > > > > > > > > from
>> > > > > > > > > > > > its
>> > > > > > > > > > > > > logical representation. We introduced a
>> translation
>> > > layer
>> > > > > > > > > (RowSchema)
>> > > > > > > > > > > to
>> > > > > > > > > > > > > convert logical schema into physical schema. This
>> is
>> > > > > > necessery
>> > > > > > > > for
>> > > > > > > > > > > > > serialization or code generation when the logical
>> > plan
>> > > is
>> > > > > > > > > translated
>> > > > > > > > > > > > into a
>> > > > > > > > > > > > > physical execution plan. Processing-time
>> timestamps
>> > are
>> > > > > > > similarly
>> > > > > > > > > > > > handled.
>> > > > > > > > > > > > > They are not included in the physical schema and
>> > looked
>> > > > up
>> > > > > > when
>> > > > > > > > > > needed.
>> > > > > > > > > > > > > This design also requires that we need to
>> materialize
>> > > > > > > timestamps
>> > > > > > > > > when
>> > > > > > > > > > > > they
>> > > > > > > > > > > > > are accessed by expressions. Timestamp
>> > materialization
>> > > is
>> > > > > > done
>> > > > > > > > as a
>> > > > > > > > > > > > > pre-optimization step.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > While thinking about the implementation of the
>> > > event-time
>> > > > > > > > windowed
>> > > > > > > > > > > > > stream-stream join [1] I stumbled over the
>> question
>> > > which
>> > > > > > > > timestamp
>> > > > > > > > > > of
>> > > > > > > > > > > > both
>> > > > > > > > > > > > > input tables to forward. With the current design,
>> we
>> > > > could
>> > > > > > only
>> > > > > > > > > have
>> > > > > > > > > > a
>> > > > > > > > > > > > > single timestamp, so keeping both timestamps would
>> > not
>> > > be
>> > > > > > > > possible.
>> > > > > > > > > > The
>> > > > > > > > > > > > > choice of the timestamp would need to be
>> specified by
>> > > the
>> > > > > > query
>> > > > > > > > > > > otherwise
>> > > > > > > > > > > > > it would lack clear semantics. When executing the
>> > join,
>> > > > the
>> > > > > > > join
>> > > > > > > > > > > operator
>> > > > > > > > > > > > > would need to make sure that no late data is
>> emitted.
>> > > > This
>> > > > > > > would
>> > > > > > > > > only
>> > > > > > > > > > > > work
>> > > > > > > > > > > > > the operator was able to hold back watermarks [2].
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > With this information in mind, I'd like to discuss
>> > the
>> > > > > > > following
>> > > > > > > > > > > > proposal:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > - We allow more than one event-time timestamp and
>> > store
>> > > > > them
>> > > > > > > > > directly
>> > > > > > > > > > > in
>> > > > > > > > > > > > > the Row
>> > > > > > > > > > > > > - The query operators ensure that the watermarks
>> are
>> > > > always
>> > > > > > > > behind
>> > > > > > > > > > all
>> > > > > > > > > > > > > event-time timestamps. With additional analysis we
>> > will
>> > > > be
>> > > > > > able
>> > > > > > > > to
>> > > > > > > > > > > > restrict
>> > > > > > > > > > > > > this to timestamps that are actually used as such.
>> > > > > > > > > > > > > - When a DataStream operator is time-based (e.g.,
>> a
>> > > > > > DataStream
>> > > > > > > > > > > > > time-windows), we inject an operator that copies
>> the
>> > > > > > timestamp
>> > > > > > > > from
>> > > > > > > > > > the
>> > > > > > > > > > > > Row
>> > > > > > > > > > > > > into the StreamRecord.
>> > > > > > > > > > > > > - We try to remove the distinction between logical
>> > and
>> > > > > > physical
>> > > > > > > > > > schema.
>> > > > > > > > > > > > > For event-time timestamps this is because we store
>> > them
>> > > > in
>> > > > > > the
>> > > > > > > > Row
>> > > > > > > > > > > > object,
>> > > > > > > > > > > > > for processing-time timestamps, we add a dummy
>> byte
>> > > > field.
>> > > > > > When
>> > > > > > > > > > > > accessing a
>> > > > > > > > > > > > > field of this type, the code generator injects the
>> > code
>> > > > to
>> > > > > > > fetch
>> > > > > > > > > the
>> > > > > > > > > > > > > timestamps.
>> > > > > > > > > > > > > - We might be able to get around the
>> pre-optimization
>> > > > time
>> > > > > > > > > > > > materialization
>> > > > > > > > > > > > > step.
>> > > > > > > > > > > > > - A join result would be able to keep both
>> > timestamps.
>> > > > The
>> > > > > > > > > watermark
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > be hold back for both so both could be used in
>> > > subsequent
>> > > > > > > > > operations.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I admit, I haven't thought this completely
>> through.
>> > > > > > > > > > > > > However, the benefits of this design from my
>> point of
>> > > > view
>> > > > > > are:
>> > > > > > > > > > > > > - encoding of timestamps in Rows means that the
>> > logical
>> > > > > > schema
>> > > > > > > is
>> > > > > > > > > > equal
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > the physical schema
>> > > > > > > > > > > > > - no timestamp materialization
>> > > > > > > > > > > > > - support for multiple timestamps. Otherwise we
>> would
>> > > > need
>> > > > > to
>> > > > > > > > > expose
>> > > > > > > > > > > > > internal restrictions to the user which are hard
>> to
>> > > > > explain /
>> > > > > > > > > > > > communicate.
>> > > > > > > > > > > > > - no need to change any public interfaces at the
>> > > moment.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The drawbacks as far as I see them are:
>> > > > > > > > > > > > > - additional payload due to unused timestamp
>> field +
>> > > > > possibly
>> > > > > > > the
>> > > > > > > > > > > > > processing-time dummy field
>> > > > > > > > > > > > > - complete rework of the internal timestamp logic
>> > > > > (again...)
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Please let me know what you think,
>> > > > > > > > > > > > > Fabian
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > [1] https://issues.apache.org/jira
>> /browse/FLINK-6233
>> > > > > > > > > > > > > [2] https://issues.apache.org/jira
>> /browse/FLINK-7245
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [DISCUSS] Table API / SQL internal timestamp handling

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I think the implementation of the join operator should not depend on the
synchronization of the watermarks.
If we need to buffer a stream because we have to wait for future records
from the other stream to join them, then that's how it is. We cannot change
the semantics of the query.
It might be possible to apply backpressure to the source to delay one
stream, but this is a very delicate thing to do and can easily lead to
deadlocks and can have unpredictable side effects. So I would just consume
the stream and put it into the state.

From my point of view, timestamps cannot be "irreconcilable" because the
query semantics and input streams are given.
A join will always need to buffer some records. It is true that the join
operator might need to buffer a lot of records if the input streams and
join predicate are not aligned.
IMO, This is fine as long as the query has enough resources. Once it runs
out of resources, it simply fails and can be restarted with more resources
or adapted.

Cheers, Fabian


2017-08-01 6:04 GMT+02:00 Xingcan Cui <xi...@gmail.com>:

> Hi Shaoxuan,
>
> I really appreciate your prompt reply. What you explained makes sense to
> me.
>
> There is only one point that I got some different ideas about "we have to
> buffer
> all the delta data between watermarks of two inputs".
>
> Consider the following SQL on joining two streams l and r:
>
> SELECT * FROM l, r
> WHERE l.name = r.name
> AND l.ts BETWEEN r.ts - INTERVAL '600' MINUTE
>     AND r.ts - INTERVAL '599' MINUTE;
>
> This query is valid since it holds both an equi-key and a time span
> restriction.
>
> There are two different situations to execute the query: (1) if the
> timestamps of
> l and r are synchronized, e.g., they both contain new generated events, we
> must
> buffer the l stream for 600 minutes; and (2) if there exists a natural
> offset of the two
> streams, e.g., the r stream is new generated while the l stream is sourced
> from
> a event queue generated 10 hours ago, it is unnecessary to buffer so much
> data.
>
> That raises the question. What if the timestamps of the two streams are
> essentially
> “irreconcilable"?
>
> Best,
> Xingcan
>
> On Mon, Jul 31, 2017 at 10:42 PM, Shaoxuan Wang <ws...@gmail.com>
> wrote:
>
> > Xingcan,
> > Watermark is the “estimate of completion”. User defines the waterMark
> based
> > on the best estimation per each input of when it pretty much sees all the
> > data. It is usually calculated by the event timestamp.
> > When we do a windowed join, we have to make sure the watermark for both
> > inputs are received before emit a window result at this watermark. If the
> > two inputs have large difference, say "one for today and the other one
> > for yesterday" as you pointed out, the watermark for the windowed join
> > operator is just yesterday.  I guess this is what Fabian means "In case
> of
> > a join, the smallest future timestamp depends on two fields and not just
> on
> > one." In the windowed join cases, we have to buffer all the delta data
> > between watermarks of two inputs. It is the user's responsibility (if
> > she/he wants to reduce the cost) to align watermarks of the stream
> sources
> > as much as possible.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xi...@gmail.com>
> wrote:
> >
> > > Hi Fabian,
> > >
> > > I got a similar question with Jark. Theoretically, the row times of two
> > > streams
> > > could be quite difference, e.g., one for today and the other one for
> > > yesterday.
> > > How can we align them?
> > >
> > > Best,
> > > Xingcan
> > >
> > > On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > yes, the handling of watermarks is very tricky. It is not directly
> > > related
> > > > to the proposal which is only about the representation of timestamps
> > but
> > > > becomes important for event-time joins.
> > > > We have a JIRA about an operator that is able to hold back watermarks
> > > [1].
> > > >
> > > > Roughly the idea is to track the smallest timestamp that will be
> > emitted
> > > in
> > > > the future and align the watermark to this timestamp.
> > > > For this we need to know the semantics of the operator (which
> timestamp
> > > > will be emitted in the future) but this will be given for relational
> > > > operators.
> > > > The new operator could emit a watermark whenever it received one.
> > > >
> > > > In case of a join, the smallest future timestamp depends on two
> fields
> > > and
> > > > not just on one.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-7245
> > > >
> > > >
> > > > 2017-07-31 14:35 GMT+02:00 Jark Wu <ja...@apache.org>:
> > > >
> > > > > Hi,
> > > > >
> > > > > @Fabian, I read your proposal carefully again, and I'm big +1 to do
> > it.
> > > > The
> > > > > proposal can address the problem of that how to forward both input
> > > > tables'
> > > > > rowtime of dual stream join (windowed/non-windowed). The additional
> > > > > payload drawback
> > > > > is acceptable.
> > > > >
> > > > > You mentioned that:
> > > > >
> > > > > > The query operators ensure that the watermarks are always behind
> > all
> > > > > > event-time timestamps. With additional analysis we will be able
> to
> > > > > restrict
> > > > > > this to timestamps that are actually used as such.
> > > > >
> > > > > I'm more curious about how can we define the watermark strategies
> in
> > > > order
> > > > > to make sure all timestamp columns are aligned to watermarks.
> > > Especially,
> > > > > when the watermark has been defined in the input DataStream.
> > > > >
> > > > > Bests,
> > > > > Jark Wu
> > > > >
> > > > >
> > > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xi...@gmail.com>:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thanks for the answers, @Fabian.
> > > > > >
> > > > > > @Jark, at first I also wanted the users to reassign the timestamp
> > > field
> > > > > > arbitrarily. However, that means we have to break the current
> "time
> > > > > system"
> > > > > > and create a new one. The blocked watermarks become meaningless
> and
> > > > > maybe a
> > > > > > new WatermarkAssigner should be provided. A little more strict
> > > > mechanism
> > > > > > would be only allowing to use the existing timestamp fields. It
> > > sounds
> > > > > > reasonable, but will bring an unnecessary barrier to stream/batch
> > > SQL,
> > > > > i.e.
> > > > > > some SQL works for the batch can not be executed in the stream
> > > > > environment.
> > > > > > I just wonder if we could automatically choose a field, which
> will
> > be
> > > > > used
> > > > > > in the following calculations. Not sure if it makes sense.
> > > > > >
> > > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main
> > > block
> > > > > for
> > > > > > consolidating stream/batch SQL. Though from a general point of
> > view,
> > > it
> > > > > can
> > > > > > indicate the time to some extent, the randomness property
> > determines
> > > > that
> > > > > > it should never be used in time-sensitive applications. I always
> > > > believe
> > > > > in
> > > > > > that all the information used for query evaluation should be
> > acquired
> > > > > from
> > > > > > data itself.
> > > > > >
> > > > > > Best,
> > > > > > Xingcan
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <
> fhueske@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Shaoxuan,
> > > > > > >
> > > > > > > thanks for your comments. I agree with your comment:
> > > > > > >
> > > > > > > > The problem we used to have is that we have treated eventtime
> > > > column
> > > > > > as a
> > > > > > > special timestamp column.
> > > > > > >
> > > > > > > IMO, an event-time timestamp column is a regular column that is
> > > > aligned
> > > > > > > with the watermarks of the stream.
> > > > > > > In order to distinguish watermark aligned columns from others,
> we
> > > > need
> > > > > a
> > > > > > > special flag in the schema.
> > > > > > > When a timestamp column is modified and we cannot guarantee
> that
> > is
> > > > it
> > > > > > > still aligned with the watermarks, it must lose the special
> flag
> > > and
> > > > be
> > > > > > > treated like any other column.
> > > > > > >
> > > > > > > Regarding your comments:
> > > > > > > 1) I agree, that we can use Long in addition to Timestamp as a
> > > > > timestamp
> > > > > > > columns. Since timestamp columns need to be comparable to
> > > watermarks
> > > > > > which
> > > > > > > are Longs, I don't see that other types would make sense. For
> > now,
> > > I
> > > > > > would
> > > > > > > keep the restriction that timestamps can only be of Timestamp
> > > type. I
> > > > > > > think, extending this to Long would be a follow-up issue to the
> > > > > changes I
> > > > > > > proposed here.
> > > > > > > 2) Relates to 1) and I agree. if we use a Long attribute as
> > > timestamp
> > > > > it
> > > > > > > should remain of type Long. For now I would keep converting it
> to
> > > > > > Timestamp
> > > > > > > and change that later.
> > > > > > > 3) Yes, timestamp columns must be aligned to watermarks. That's
> > > their
> > > > > > > primary characteristic. How to define watermark strategies is
> > > > > orthogonal
> > > > > > to
> > > > > > > this discussion, IMO.
> > > > > > > 4) From my point of view, proc-time is a purely virtual column
> > and
> > > > not
> > > > > > > related to an actual (data) column. However, it must be part of
> > the
> > > > > > schema
> > > > > > > and treated like any other attribute for a good user experience
> > and
> > > > SQL
> > > > > > > compliance. In order to be able to join two tables on
> processing
> > > > time,
> > > > > it
> > > > > > > must be possible to include a processing time column in the
> > schema
> > > > > > > definition of the table. Processing time queries can never
> > compute
> > > > the
> > > > > > same
> > > > > > > results as batch queries but their semantics should be aligned
> > with
> > > > > > > event-time queries.
> > > > > > >
> > > > > > > Best, Fabian
> > > > > > >
> > > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <
> radu.tudoran@huawei.com
> > >:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > @Shaoxuan - thanks for the  remarks. I have a question
> > regarding
> > > > your
> > > > > > > > suggestion not to consider to create proctime window in a
> > regular
> > > > > > > column. I
> > > > > > > > think this would be useful though. First you might need to
> > carry
> > > > the
> > > > > > > > timestamp indicator of when the processing happened (for log
> > > > > purposes,
> > > > > > > > provenance, traceability ...). Secondly - I do not think it
> is
> > > > > > > > contradicting with the semantics in batch SQL as in SQL you
> > have
> > > > the
> > > > > > > > function "now()" ...which pretty much carry the same
> semantics
> > as
> > > > > > having
> > > > > > > a
> > > > > > > > function to mark the proctime and then projecting this into a
> > > > column.
> > > > > > If
> > > > > > > I
> > > > > > > > am not mistaken you can introduce in database columns the
> > result
> > > of
> > > > > > > calling
> > > > > > > > now().
> > > > > > > >
> > > > > > > >
> > > > > > > > Dr. Radu Tudoran
> > > > > > > > Staff Research Engineer - Big Data Expert
> > > > > > > > IT R&D Division
> > > > > > > >
> > > > > > > >
> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > German Research Center
> > > > > > > > Munich Office
> > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > >
> > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > Mobile: +49 15209084330
> > > > > > > > Telephone: +49 891588344173
> > > > > > > >
> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > > > 56063,
> > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> HRB
> > > > 56063,
> > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > This e-mail and its attachments contain confidential
> > information
> > > > from
> > > > > > > > HUAWEI, which is intended only for the person or entity whose
> > > > address
> > > > > > is
> > > > > > > > listed above. Any use of the information contained herein in
> > any
> > > > way
> > > > > > > > (including, but not limited to, total or partial disclosure,
> > > > > > > reproduction,
> > > > > > > > or dissemination) by persons other than the intended
> > recipient(s)
> > > > is
> > > > > > > > prohibited. If you receive this e-mail in error, please
> notify
> > > the
> > > > > > sender
> > > > > > > > by phone or email immediately and delete it!
> > > > > > > >
> > > > > > > >
> > > > > > > > -----Original Message-----
> > > > > > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
> > > > > > > > Sent: Thursday, July 27, 2017 6:00 AM
> > > > > > > > To: Dev
> > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> > > handling
> > > > > > > >
> > > > > > > >  Hi Everyone,
> > > > > > > > I like this proposal. The problem we used to have is that we
> > have
> > > > > > treated
> > > > > > > > eventtime column as a special timestamp column. An eventtime
> > > column
> > > > > is
> > > > > > > > nothing special than all other regular columns, but with a
> > > certain
> > > > > flag
> > > > > > > > (eventtime-indicator) inferring that this column can be used
> as
> > > an
> > > > > > > eventime
> > > > > > > > to decide when a bounded query can emit the final result by
> > > > comparing
> > > > > > > with
> > > > > > > > a concern associated waterMark.
> > > > > > > >
> > > > > > > > I have a few comments adding on top of this (they may have
> > > already
> > > > > been
> > > > > > > > addressed in the conversation — since It’s a long
> discussion, I
> > > may
> > > > > > miss
> > > > > > > > something):
> > > > > > > >
> > > > > > > >    1. While we remove timestamp column, we introduce
> > > > > > eventtime-indicator
> > > > > > > >    (we may already have this concept), it is only a flag can
> be
> > > > > applied
> > > > > > > for
> > > > > > > >    any column (note that some types may not be able to be
> used
> > as
> > > > > > > eventtime
> > > > > > > >    column), indicating if this column can be used as
> eventtime
> > or
> > > > > not.
> > > > > > > This
> > > > > > > >    flag is useful for validation and codeGen.
> > > > > > > >    2. A column that has been used as an eventtime, should not
> > > lose
> > > > > its
> > > > > > > own
> > > > > > > >    type. We should not cast all eventime column to the
> > timestamp
> > > > > type.
> > > > > > > For
> > > > > > > >    instance, if a column is a long type, it will keep as long
> > > type
> > > > > even
> > > > > > > if
> > > > > > > > a
> > > > > > > >    window aggregate has used it as a eventtime.
> > > > > > > >    3. Eventtime will only work well with some associated
> > > waterMark
> > > > > > > >    strategy. We may consider forcing user to provide a
> > waterMark
> > > > > logic
> > > > > > on
> > > > > > > >    his/her selected eventtime.
> > > > > > > >    4. For proctime, I hope we should not introduce
> > > > proctime-indicator
> > > > > > for
> > > > > > > >    regular column. Ideally we should not allow user to create
> > > > > proctime
> > > > > > > > window
> > > > > > > >    on regular column, as this is against the batch query
> > > semantics.
> > > > > > > > Therefore
> > > > > > > >    I suggest we should always introduce a proctime timestamp
> > > column
> > > > > for
> > > > > > > > users
> > > > > > > >    to create proctime window. And unlike eventtime, proctime
> > does
> > > > not
> > > > > > > need
> > > > > > > > any
> > > > > > > >    associated waterMark strategy, as there is no such out of
> > > order
> > > > > > issue
> > > > > > > > for
> > > > > > > >    the proctime.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Shaoxuan
> > > > > > > >
> > > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <
> > > fhueske@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks everybody for the replies so far.
> > > > > > > > >
> > > > > > > > > Let me answer your questions and reply to your thoughts:
> > > > > > > > >
> > > > > > > > > Radu:
> > > > > > > > > ---
> > > > > > > > > First of all, although my proposal is movivated by a join
> > > > operator,
> > > > > > > > > this discussion is about timestamp handling, not about
> joins
> > in
> > > > > > > general.
> > > > > > > > >
> > > > > > > > > - The semantics of outer joins is to emit null and there is
> > no
> > > > way
> > > > > > > > > around that. This is not an issue for us. Actually, outer
> > joins
> > > > are
> > > > > > > > > supported by the batch SQL / Table API. It is true that
> outer
> > > > joins
> > > > > > > > > might result in null timestamps. Calcite will mark those
> > fields
> > > > as
> > > > > > > > > nullable and we should check that timestamps which are used
> > in
> > > > > > windows
> > > > > > > > or joins are not nullable.
> > > > > > > > > - The query has to explicitly specify which timestamp
> > attribute
> > > > to
> > > > > > use.
> > > > > > > > > Otherwise its semantics are not complete and it is
> invalid. A
> > > > > > > > > group-window that follows a join will reference a timestamp
> > > > > attribute
> > > > > > > > > and this will be used. The other timestamp might be
> projected
> > > > out.
> > > > > > > > > When a result with two timestamps is converted into a
> > > DataStream,
> > > > > the
> > > > > > > > > user has to decide. This could be done inside of the Table
> to
> > > > > > > > > DataStream conversion. If the Table has more than one valid
> > > > > > timestamp,
> > > > > > > > > the conversion will ask which timestamp to forward.
> > > > > > > > > - A proctime join should forward all proctime attributes of
> > the
> > > > > input
> > > > > > > > > tables. All will be the same, but that does not matter
> > because
> > > > they
> > > > > > > > > are either virtual or represented as 1 byte dummy
> attributes.
> > > > Also,
> > > > > > > > > unused ones will be automatically projected out anyway.
> > > > > > > > > - An event-time join should forward all event-time
> attributes
> > > of
> > > > > the
> > > > > > > > > input tables. Creating a new event-time attribute using
> > > > processing
> > > > > > > > > time makes event-time processing pointless and will give
> > > > completely
> > > > > > > > random results.
> > > > > > > > > Event-time is not about the "time an event is created" but
> > > about
> > > > a
> > > > > > > > > timestamp that is associated with an event. For example an
> > > order
> > > > > > event
> > > > > > > > > could have three timestamps: "orderTime", "shipTime", and
> > > > > > > "receiveTime".
> > > > > > > > > Each could be a valid event-time attribute.
> > > > > > > > >
> > > > > > > > > Jark:
> > > > > > > > > ---
> > > > > > > > > Thanks for the proposal. I think I understand what you want
> > to
> > > > > > achieve
> > > > > > > > > with this, but I think functions to instantiate time
> > attributes
> > > > are
> > > > > > > > > not necessary and would make things more complicated. The
> > point
> > > > of
> > > > > > > > > supporting multiple time attributes is to ensure that all
> of
> > > them
> > > > > are
> > > > > > > > > aligned with the watermarks. If we add a method
> > > > ROW_TIME(timestamp)
> > > > > > > > > and we don't know if the timestamp is aligned with the
> > > > watermarks.
> > > > > If
> > > > > > > > > that is not the case, the query won't be executed as
> > expected.
> > > > The
> > > > > > > > > issue of LEFT JOIN can easily be addressed by checking for
> > > > > > > > > nullablility during optimization when an operator tries to
> > use
> > > > it.
> > > > > > > > >
> > > > > > > > > The beauty of supporting multiple timestamps is that a user
> > > does
> > > > > not
> > > > > > > > > have to care at all about timestamps (or timestamp
> functions)
> > > and
> > > > > > > > > watermarks. As long as the query uses a timestamp attribute
> > > that
> > > > > was
> > > > > > > > > originally declared as rowtime in a source table (and was
> not
> > > > > > modified
> > > > > > > > > afterwards), this is fine. Think of a cascade of three
> > windowed
> > > > > > joins:
> > > > > > > > > R - S - T - U, and you want to join S - T first. In that
> > case,
> > > > you
> > > > > > > > > need to preserve the timestamps of S and T in order to
> join R
> > > and
> > > > > U.
> > > > > > > > > From a relational algebra point of view, there is no reason
> > to
> > > > > have a
> > > > > > > > > limitation on how these attributes are accessed. Timestamps
> > are
> > > > > just
> > > > > > > > > regular fields of a record. The only restriction in the
> > context
> > > > of
> > > > > > > > > stream processing is that the watermark must be aligned
> with
> > > > > > > > > timestamps, i.e., follow all timestamps such that data is
> not
> > > > late
> > > > > > > > > according to any of the timestamps. This we can achieve and
> > > > handle
> > > > > > > > internally without the user having to worry about it.
> > > > > > > > >
> > > > > > > > > Xingcan:
> > > > > > > > > ---
> > > > > > > > > I think your questions are mostly implementation details
> and
> > > not
> > > > so
> > > > > > > > > much related to the original proposal of supporting
> multiple
> > > > > > > timestamps.
> > > > > > > > >
> > > > > > > > > My take on your questions is:
> > > > > > > > > 1. The rate at which watermarks are emitted is not
> important
> > > for
> > > > > the
> > > > > > > > > correctness of a query. However, it can affect the
> > performance,
> > > > > > > > > because each watermark is sent as a special record and it
> is
> > > > > > > > > broadcasted. My initial take would be to emit a new
> watermark
> > > > > > whenever
> > > > > > > > > the operator updated its watermark because usually, the
> > > operator
> > > > > > would
> > > > > > > > > have forwarded the old watermark.
> > > > > > > > > 2. I would say this is the responsibility of the operator
> > > because
> > > > > > > > > first it is not related to the semantics of the query and
> > > second
> > > > it
> > > > > > is
> > > > > > > > > an operator responsibility in the existing code as well.
> > > > > > > > >
> > > > > > > > > Jark 2:
> > > > > > > > > You are right, the query (or user) must decide on the
> > > event-time
> > > > > > > > > attribute to use. My main point is, it is much easier for
> the
> > > > user
> > > > > > > > > (and for us
> > > > > > > > > internally) if we internally track multiple timestamps.
> > Because
> > > > we
> > > > > do
> > > > > > > > > not have to prune the timestamp that will not be later used
> > > into
> > > > > the
> > > > > > > > join.
> > > > > > > > > Moreover, both timestamps might be used later (see join
> > > example,
> > > > > > which
> > > > > > > > > could be reordered of course). All we have to do is to
> ensure
> > > > that
> > > > > > all
> > > > > > > > > timestamps are aligned with the watermarks.
> > > > > > > > >
> > > > > > > > > Radu 2:
> > > > > > > > > IMO, time (or anything else that affects the semantics)
> > should
> > > > > never
> > > > > > > > > be decided by the system. When we would do that, a query is
> > not
> > > > > fully
> > > > > > > > > specified or, even worse, the way it is executed is
> > > semantically
> > > > > > > > > incorrect and produces arbitrary results.
> > > > > > > > >
> > > > > > > > > Time attributes should be specified in the source tables
> and
> > > then
> > > > > > > > > forwarded from there. So far I haven't seen an example
> where
> > > this
> > > > > > > > > would not be possible (within the semantics or relational
> > > > queries).
> > > > > > If
> > > > > > > > > we do that right, there won't be a need for explicit time
> > > > > management
> > > > > > > > > except for the definition of the initial timestamps which
> can
> > > be
> > > > > > > > > hidden in the table definition. As I said before, we (or
> the
> > > > > system)
> > > > > > > > > cannot decide on the timestamp because that would lead to
> > > > arbitrary
> > > > > > > > > results. Asking the user to do that would mean explicit
> time
> > > > > > > > > management which is also not desirable. I think my proposal
> > > gives
> > > > > > > > > users all options (timestamps) to chose from and the system
> > can
> > > > do
> > > > > > the
> > > > > > > > rest.
> > > > > > > > >
> > > > > > > > > Best, Fabian
> > > > > > > > >
> > > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <
> > > radu.tudoran@huawei.com
> > > > >:
> > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > I just want to add that I was referring to NULL values
> not
> > > > > > > > > > specifically
> > > > > > > > > to
> > > > > > > > > > timefields but to the event itself. If you have the
> follow
> > > > > > situation
> > > > > > > > > >
> > > > > > > > > > Stream 1:     .... |    event1   | ....
> > > > > > > > > > Stream 2:     .... |             | ....
> > > > > > > > > >
> > > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2
> (no
> > > > > > > > > > condition)...then you still need to emit (event1,null)
> ...
> > as
> > > > > this
> > > > > > > > > > is the behavior of left join. This is maybe a very simple
> > > > > > situation,
> > > > > > > > > > but the
> > > > > > > > > point
> > > > > > > > > > is that left joins and right joins can have situation
> when
> > > you
> > > > > have
> > > > > > > > > > elements only in the main stream and no element in the
> > right
> > > > > > stream.
> > > > > > > > > > And for this case you still need to emit.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regarding whether time should be decided by system or
> > not...i
> > > > > think
> > > > > > > > > > the answer is it depends. I think the example from Jack
> is
> > > very
> > > > > > good
> > > > > > > > > > and
> > > > > > > > > shows
> > > > > > > > > > the need for some mechanisms to select/manage the time (I
> > > like
> > > > > the
> > > > > > > > > proposal
> > > > > > > > > > of having functions to insert the time in the output!).
> > > > However,
> > > > > if
> > > > > > > > > > a business analyst would write a query without explicit
> > time
> > > > > > > > > > management we still need to have some default behavior in
> > the
> > > > > > > > > > system. As per my initial proposal, I think  we need to
> > > decide
> > > > on
> > > > > > > > > > one timestamp field to carry (either a new one at the
> > moment
> > > of
> > > > > the
> > > > > > > > > > join) or the timestamp from the
> > > > > > > > > main
> > > > > > > > > > stream  (...although I am not sure which one is the main
> > > stream
> > > > > in
> > > > > > > > > > the
> > > > > > > > > case
> > > > > > > > > > of a full join:) )
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Dr. Radu Tudoran
> > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > German Research Center
> > > > > > > > > > Munich Office
> > > > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > > > >
> > > > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > > > Mobile: +49 15209084330
> > > > > > > > > > Telephone: +49 891588344173
> > > > > > > > > >
> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> www.huawei.com
> > > > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
> > HRB
> > > > > > 56063,
> > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> Düsseldorf,
> > > HRB
> > > > > > 56063,
> > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > > This e-mail and its attachments contain confidential
> > > > information
> > > > > > from
> > > > > > > > > > HUAWEI, which is intended only for the person or entity
> > whose
> > > > > > address
> > > > > > > > is
> > > > > > > > > > listed above. Any use of the information contained herein
> > in
> > > > any
> > > > > > way
> > > > > > > > > > (including, but not limited to, total or partial
> > disclosure,
> > > > > > > > > reproduction,
> > > > > > > > > > or dissemination) by persons other than the intended
> > > > recipient(s)
> > > > > > is
> > > > > > > > > > prohibited. If you receive this e-mail in error, please
> > > notify
> > > > > the
> > > > > > > > sender
> > > > > > > > > > by phone or email immediately and delete it!
> > > > > > > > > >
> > > > > > > > > > -----Original Message-----
> > > > > > > > > > From: Jark Wu [mailto:jark@apache.org]
> > > > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
> > > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> > > > > handling
> > > > > > > > > >
> > > > > > > > > > Hi Xingcan,
> > > > > > > > > >
> > > > > > > > > > IMO, I don't think event-time of join results could be
> > > > > > automatically
> > > > > > > > > > decided by system. Considering batch tables, if users
> want
> > a
> > > > > event
> > > > > > > time
> > > > > > > > > > window aggregation after join, user must specify the time
> > > field
> > > > > > > > > explicitly
> > > > > > > > > > (T1.rowtime or T2.rowtime or the computed result of
> them).
> > So
> > > > in
> > > > > > the
> > > > > > > > case
> > > > > > > > > > of streaming tables, the system also can't automatically
> > > decide
> > > > > the
> > > > > > > > time
> > > > > > > > > > field for users.
> > > > > > > > > >
> > > > > > > > > > In regards to the question you asked, I think we don't
> need
> > > to
> > > > > > change
> > > > > > > > the
> > > > > > > > > > watermark no matter we choose the left rowtime or right
> > > rowtime
> > > > > or
> > > > > > > the
> > > > > > > > > > combination. Because the watermark has been aligned with
> > the
> > > > > > rowtime
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > source. Maybe I'm wrong about this, please correct me if
> > I'm
> > > > > > missing
> > > > > > > > > > something.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Jark
> > > > > > > > > >
> > > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <
> xingcanc@gmail.com
> > >:
> > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > @Fabian, thanks for raising this.
> > > > > > > > > > >
> > > > > > > > > > > @Radu and Jark, personally I think the timestamp field
> is
> > > > > > critical
> > > > > > > > for
> > > > > > > > > > > query processing and thus should be declared as (or
> > > supposed
> > > > to
> > > > > > be)
> > > > > > > > > > > NOT NULL. In addition, I think the event-time semantic
> of
> > > the
> > > > > > join
> > > > > > > > > > > results should be automatically decided by the system,
> > > i.e.,
> > > > we
> > > > > > do
> > > > > > > > not
> > > > > > > > > > > hand it over to users so to avoid some unpredictable
> > > > > assignment.
> > > > > > > > > > >
> > > > > > > > > > > Generally speaking, consolidating different time fields
> > is
> > > > > > possible
> > > > > > > > > > > since all of them should ideally be monotonically
> > > increasing.
> > > > > > From
> > > > > > > my
> > > > > > > > > > > point of view, the problem lies in
> > > > > > > > > > > (1) what's the relationship between the old and new
> > > > watermarks.
> > > > > > > Shall
> > > > > > > > > > > they be one-to-one mapping or the new watermarks could
> > skip
> > > > > some
> > > > > > > > > > > timestamps? And (2) who is in charge of emitting the
> > > blocked
> > > > > > > > > > > watermarks, the operator or the process function?
> > > > > > > > > > >
> > > > > > > > > > > I'd like to hear from you.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Xingcan
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <
> > jark@apache.org
> > > >
> > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > Radu's concerns make sense to me, especially the null
> > > value
> > > > > > > > > > > > timestamp and multi-proctime.
> > > > > > > > > > > >
> > > > > > > > > > > > I have also something in my mind. I would like to
> > propose
> > > > > some
> > > > > > > time
> > > > > > > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp
> > ts)
> > > > > will
> > > > > > > > > > > > generate a event time logical attribute, PROC_TIME()
> > will
> > > > > > > generate
> > > > > > > > a
> > > > > > > > > > > > processing time logical attribute. It is similar to
> > > > > > > TUMBLE_ROWTIME
> > > > > > > > > > > > proposed in this PR https://github.com/apache/
> > > > > flink/pull/4199.
> > > > > > > > These
> > > > > > > > > > > > can be used in any queries, but there still can't be
> > more
> > > > > than
> > > > > > > one
> > > > > > > > > > > > rowtime attribute or more than one proctime attribute
> > in
> > > a
> > > > > > table
> > > > > > > > > > schema.
> > > > > > > > > > > >
> > > > > > > > > > > > The both selected timestamp fields from a JOIN query
> > will
> > > > be
> > > > > > > > > > > materialized.
> > > > > > > > > > > > If someone needs further down the computation based
> on
> > > the
> > > > > > event
> > > > > > > > > > > > time,
> > > > > > > > > > > they
> > > > > > > > > > > > need to create a new time attribute using the
> > > ROW_TIME(...)
> > > > > > > > > > > > function. And this can also solve the null timestamp
> > > > problem
> > > > > in
> > > > > > > > LEFT
> > > > > > > > > > > > JOIN, because we
> > > > > > > > > > > can
> > > > > > > > > > > > use a user defined function to combine the two
> rowtimes
> > > and
> > > > > > make
> > > > > > > > the
> > > > > > > > > > > result
> > > > > > > > > > > > as the event time attribute, e.g. SELECT
> > > > > > ROW_TIME(udf(T1.rowtime,
> > > > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
> > > > > > radu.tudoran@huawei.com
> > > > > > > >:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think this is an interesting discussion and I
> would
> > > > like
> > > > > to
> > > > > > > add
> > > > > > > > > > > > > some issues and give some feedback.
> > > > > > > > > > > > >
> > > > > > > > > > > > > - For supporting the join we do not only need to
> > think
> > > of
> > > > > the
> > > > > > > > time
> > > > > > > > > > > > > but also on the null values. For example if you
> have
> > a
> > > > LEFT
> > > > > > (or
> > > > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and
> the
> > > > > > secondary
> > > > > > > > > > > > > input is not
> > > > > > > > > > > > available
> > > > > > > > > > > > > you should still emit Row.of(event1, null)...as far
> > as
> > > I
> > > > > know
> > > > > > > if
> > > > > > > > > > > > > you
> > > > > > > > > > > need
> > > > > > > > > > > > > to serialize/deserialize null values to send them
> > they
> > > do
> > > > > not
> > > > > > > > > > > > > work. So
> > > > > > > > > > > we
> > > > > > > > > > > > > should include this scenario in the discussions -If
> > we
> > > > will
> > > > > > > have
> > > > > > > > > > > > > multiple timestamp in an (output) event, one
> question
> > > > > > > > > > > is
> > > > > > > > > > > > > how to select afterwards which is the primary time
> > > field
> > > > on
> > > > > > > which
> > > > > > > > > > > > > to operate. When we describe a query we might be
> able
> > > to
> > > > > > > specify
> > > > > > > > > > > > > (or we
> > > > > > > > > > > get
> > > > > > > > > > > > > this implicitly if we implement the carryon of the
> 2
> > > > > > > timestamps)
> > > > > > > > > > > Select
> > > > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a
> > query
> > > is
> > > > > the
> > > > > > > > > > > > > input of
> > > > > > > > > > > a
> > > > > > > > > > > > > new processing pipeline, then, do we support
> > generally
> > > > also
> > > > > > > that
> > > > > > > > > > > > > the
> > > > > > > > > > > > input
> > > > > > > > > > > > > has 2 time fields? ...how do we deal with the 2
> input
> > > > > fields
> > > > > > > > > > > > > (maybe I
> > > > > > > > > > > am
> > > > > > > > > > > > > missing something) further in the datastream
> pipeline
> > > > that
> > > > > we
> > > > > > > > > > > > > build
> > > > > > > > > > > based
> > > > > > > > > > > > > on the output?
> > > > > > > > > > > > > - For the case of proctime - do we need to carry 2
> > > > > proctimes
> > > > > > > (the
> > > > > > > > > > > > > proctimes of the incoming events from each stream),
> > or
> > > 1
> > > > > > > proctime
> > > > > > > > > > > > > (as
> > > > > > > > > > > we
> > > > > > > > > > > > > operate on proctime and the combination of the 2
> > inputs
> > > > can
> > > > > > be
> > > > > > > > > > > considered
> > > > > > > > > > > > > as a new event, the current proctime on the machine
> > can
> > > > be
> > > > > > > > > > > > > considered
> > > > > > > > > > > the
> > > > > > > > > > > > > (proc)time reference for output event) or 3
> proctimes
> > > > (the
> > > > > 2
> > > > > > > > > > > > > proctimes
> > > > > > > > > > > of
> > > > > > > > > > > > > the input plus the proctime when the new event was
> > > > > created)?
> > > > > > > > > > > > > -Similar with the point above, for even time
> (which I
> > > am
> > > > > > > > > > > > > understanding
> > > > > > > > > > > as
> > > > > > > > > > > > > the time when the event was created...or do we
> > > understand
> > > > > > them
> > > > > > > as
> > > > > > > > > > > > > a
> > > > > > > > > > > time
> > > > > > > > > > > > > carry within the event?) - when we join 2 events
> and
> > > > output
> > > > > > an
> > > > > > > > > > > > > event
> > > > > > > > > > > that
> > > > > > > > > > > > > is the result of the join - isn't this a new event
> > > detach
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > > > source\input events? ... I would tend to say it is
> a
> > > new
> > > > > > event
> > > > > > > > and
> > > > > > > > > > > > > then
> > > > > > > > > > > > as
> > > > > > > > > > > > > for proctime the event time of the new event is the
> > > > current
> > > > > > > time
> > > > > > > > > > > > > when
> > > > > > > > > > > > this
> > > > > > > > > > > > > output event was created. If we would accept this
> > > > > hypothesis
> > > > > > > then
> > > > > > > > > > > > > we
> > > > > > > > > > > > would
> > > > > > > > > > > > > not need the 2 time input fields to be
> > carried/managed
> > > > > > > > implicitly.
> > > > > > > > > > > > > If someone needs further down the computation
> > pipeline,
> > > > > then
> > > > > > in
> > > > > > > > > > > > > the query
> > > > > > > > > > > > they
> > > > > > > > > > > > > would be selected explicitly from the input stream
> > and
> > > > > > > projected
> > > > > > > > > > > > > in
> > > > > > > > > > > some
> > > > > > > > > > > > > fields to be carried (Select T1.rowtime as
> > FormerTime1,
> > > > > > > > T2.rowtime
> > > > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they
> would
> > > not
> > > > > > have
> > > > > > > > the
> > > > > > > > > > > timestamp
> > > > > > > > > > > > > logic
> > > > > > > > > > > > >
> > > > > > > > > > > > > ..my 2 cents
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Dr. Radu Tudoran
> > > > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
> > > Division
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > > > > German Research Center
> > > > > > > > > > > > > Munich Office
> > > > > > > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > > > > > > >
> > > > > > > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > > > > > > Mobile: +49 15209084330
> > > > > > > > > > > > > Telephone: +49 891588344173
> > > > > > > > > > > > >
> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
> > > > www.huawei.com
> > > > > > > > > > > > > Registered Office: Düsseldorf, Register Court
> > > Düsseldorf,
> > > > > HRB
> > > > > > > > > 56063,
> > > > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli
> Wang
> > > > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> > > > Düsseldorf,
> > > > > > HRB
> > > > > > > > > 56063,
> > > > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > > > > > This e-mail and its attachments contain
> confidential
> > > > > > > information
> > > > > > > > > from
> > > > > > > > > > > > > HUAWEI, which is intended only for the person or
> > entity
> > > > > whose
> > > > > > > > > address
> > > > > > > > > > > is
> > > > > > > > > > > > > listed above. Any use of the information contained
> > > herein
> > > > > in
> > > > > > > any
> > > > > > > > > way
> > > > > > > > > > > > > (including, but not limited to, total or partial
> > > > > disclosure,
> > > > > > > > > > > > reproduction,
> > > > > > > > > > > > > or dissemination) by persons other than the
> intended
> > > > > > > recipient(s)
> > > > > > > > > is
> > > > > > > > > > > > > prohibited. If you receive this e-mail in error,
> > please
> > > > > > notify
> > > > > > > > the
> > > > > > > > > > > sender
> > > > > > > > > > > > > by phone or email immediately and delete it!
> > > > > > > > > > > > >
> > > > > > > > > > > > > -----Original Message-----
> > > > > > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
> > > > > > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal
> timestamp
> > > > > > handling
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi everybody,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to propose and discuss some changes in the
> > way
> > > > how
> > > > > > the
> > > > > > > > > Table
> > > > > > > > > > > API
> > > > > > > > > > > > > / SQL internally handles timestamps.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The Table API is implemented on top of the
> DataStream
> > > > API.
> > > > > > The
> > > > > > > > > > > DataStream
> > > > > > > > > > > > > API hides timestamps from users in order to ensure
> > that
> > > > > > > > timestamps
> > > > > > > > > > and
> > > > > > > > > > > > > watermarks are aligned. Instead users assign
> > timestamps
> > > > and
> > > > > > > > > > watermarks
> > > > > > > > > > > > once
> > > > > > > > > > > > > (usually at the source or in a subsequent operator)
> > and
> > > > let
> > > > > > the
> > > > > > > > > > system
> > > > > > > > > > > > > handle the timestamps from there on. Timestamps are
> > > > stored
> > > > > in
> > > > > > > the
> > > > > > > > > > > > timestamp
> > > > > > > > > > > > > field of the StreamRecord which is a holder for the
> > > user
> > > > > > record
> > > > > > > > and
> > > > > > > > > > the
> > > > > > > > > > > > > timestamp. DataStream operators that depend on time
> > > > > > > > (time-windows,
> > > > > > > > > > > > process
> > > > > > > > > > > > > function, ...) access the timestamp from the
> > > > StreamRecord.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In contrast to the DataSteam API, the Table API and
> > SQL
> > > > are
> > > > > > > aware
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > semantics of a query. I.e., we can analyze how
> users
> > > > access
> > > > > > > > > > timestamps
> > > > > > > > > > > > and
> > > > > > > > > > > > > whether they are modified or not. Another
> difference
> > is
> > > > > that
> > > > > > > the
> > > > > > > > > > > > timestamp
> > > > > > > > > > > > > must be part of the schema of a table in order to
> > have
> > > > > > correct
> > > > > > > > > query
> > > > > > > > > > > > > semantics.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The current design to handle timestamps is as
> > follows.
> > > > The
> > > > > > > Table
> > > > > > > > > API
> > > > > > > > > > > > > stores timestamps in the timestamp field of the
> > > > > StreamRecord.
> > > > > > > > > > > Therefore,
> > > > > > > > > > > > > timestamps are detached from the remaining data
> which
> > > is
> > > > > > stored
> > > > > > > > in
> > > > > > > > > > Row
> > > > > > > > > > > > > objects. Hence, the physical representation of a
> row
> > is
> > > > > > > different
> > > > > > > > > > from
> > > > > > > > > > > > its
> > > > > > > > > > > > > logical representation. We introduced a translation
> > > layer
> > > > > > > > > (RowSchema)
> > > > > > > > > > > to
> > > > > > > > > > > > > convert logical schema into physical schema. This
> is
> > > > > > necessery
> > > > > > > > for
> > > > > > > > > > > > > serialization or code generation when the logical
> > plan
> > > is
> > > > > > > > > translated
> > > > > > > > > > > > into a
> > > > > > > > > > > > > physical execution plan. Processing-time timestamps
> > are
> > > > > > > similarly
> > > > > > > > > > > > handled.
> > > > > > > > > > > > > They are not included in the physical schema and
> > looked
> > > > up
> > > > > > when
> > > > > > > > > > needed.
> > > > > > > > > > > > > This design also requires that we need to
> materialize
> > > > > > > timestamps
> > > > > > > > > when
> > > > > > > > > > > > they
> > > > > > > > > > > > > are accessed by expressions. Timestamp
> > materialization
> > > is
> > > > > > done
> > > > > > > > as a
> > > > > > > > > > > > > pre-optimization step.
> > > > > > > > > > > > >
> > > > > > > > > > > > > While thinking about the implementation of the
> > > event-time
> > > > > > > > windowed
> > > > > > > > > > > > > stream-stream join [1] I stumbled over the question
> > > which
> > > > > > > > timestamp
> > > > > > > > > > of
> > > > > > > > > > > > both
> > > > > > > > > > > > > input tables to forward. With the current design,
> we
> > > > could
> > > > > > only
> > > > > > > > > have
> > > > > > > > > > a
> > > > > > > > > > > > > single timestamp, so keeping both timestamps would
> > not
> > > be
> > > > > > > > possible.
> > > > > > > > > > The
> > > > > > > > > > > > > choice of the timestamp would need to be specified
> by
> > > the
> > > > > > query
> > > > > > > > > > > otherwise
> > > > > > > > > > > > > it would lack clear semantics. When executing the
> > join,
> > > > the
> > > > > > > join
> > > > > > > > > > > operator
> > > > > > > > > > > > > would need to make sure that no late data is
> emitted.
> > > > This
> > > > > > > would
> > > > > > > > > only
> > > > > > > > > > > > work
> > > > > > > > > > > > > the operator was able to hold back watermarks [2].
> > > > > > > > > > > > >
> > > > > > > > > > > > > With this information in mind, I'd like to discuss
> > the
> > > > > > > following
> > > > > > > > > > > > proposal:
> > > > > > > > > > > > >
> > > > > > > > > > > > > - We allow more than one event-time timestamp and
> > store
> > > > > them
> > > > > > > > > directly
> > > > > > > > > > > in
> > > > > > > > > > > > > the Row
> > > > > > > > > > > > > - The query operators ensure that the watermarks
> are
> > > > always
> > > > > > > > behind
> > > > > > > > > > all
> > > > > > > > > > > > > event-time timestamps. With additional analysis we
> > will
> > > > be
> > > > > > able
> > > > > > > > to
> > > > > > > > > > > > restrict
> > > > > > > > > > > > > this to timestamps that are actually used as such.
> > > > > > > > > > > > > - When a DataStream operator is time-based (e.g., a
> > > > > > DataStream
> > > > > > > > > > > > > time-windows), we inject an operator that copies
> the
> > > > > > timestamp
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > > Row
> > > > > > > > > > > > > into the StreamRecord.
> > > > > > > > > > > > > - We try to remove the distinction between logical
> > and
> > > > > > physical
> > > > > > > > > > schema.
> > > > > > > > > > > > > For event-time timestamps this is because we store
> > them
> > > > in
> > > > > > the
> > > > > > > > Row
> > > > > > > > > > > > object,
> > > > > > > > > > > > > for processing-time timestamps, we add a dummy byte
> > > > field.
> > > > > > When
> > > > > > > > > > > > accessing a
> > > > > > > > > > > > > field of this type, the code generator injects the
> > code
> > > > to
> > > > > > > fetch
> > > > > > > > > the
> > > > > > > > > > > > > timestamps.
> > > > > > > > > > > > > - We might be able to get around the
> pre-optimization
> > > > time
> > > > > > > > > > > > materialization
> > > > > > > > > > > > > step.
> > > > > > > > > > > > > - A join result would be able to keep both
> > timestamps.
> > > > The
> > > > > > > > > watermark
> > > > > > > > > > > > would
> > > > > > > > > > > > > be hold back for both so both could be used in
> > > subsequent
> > > > > > > > > operations.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I admit, I haven't thought this completely through.
> > > > > > > > > > > > > However, the benefits of this design from my point
> of
> > > > view
> > > > > > are:
> > > > > > > > > > > > > - encoding of timestamps in Rows means that the
> > logical
> > > > > > schema
> > > > > > > is
> > > > > > > > > > equal
> > > > > > > > > > > > to
> > > > > > > > > > > > > the physical schema
> > > > > > > > > > > > > - no timestamp materialization
> > > > > > > > > > > > > - support for multiple timestamps. Otherwise we
> would
> > > > need
> > > > > to
> > > > > > > > > expose
> > > > > > > > > > > > > internal restrictions to the user which are hard to
> > > > > explain /
> > > > > > > > > > > > communicate.
> > > > > > > > > > > > > - no need to change any public interfaces at the
> > > moment.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The drawbacks as far as I see them are:
> > > > > > > > > > > > > - additional payload due to unused timestamp field
> +
> > > > > possibly
> > > > > > > the
> > > > > > > > > > > > > processing-time dummy field
> > > > > > > > > > > > > - complete rework of the internal timestamp logic
> > > > > (again...)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Please let me know what you think,
> > > > > > > > > > > > > Fabian
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1] https://issues.apache.org/
> jira/browse/FLINK-6233
> > > > > > > > > > > > > [2] https://issues.apache.org/
> jira/browse/FLINK-7245
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>