You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xingcan Cui <xi...@gmail.com> on 2017/04/11 03:30:35 UTC

Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply
registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime()
rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order
for them can always be preserved.

Thanks,
Xingcan

Re: Question about the process order in stream aggregate

Posted by Xingcan Cui <xi...@gmail.com>.
Hi,

@Radu @Stefano, sorry that I misunderstood it before. We considered the
problem from different viewpoints. I agree that (ingestion) timestamp
injection could be a good solution for this problem in some scenarios.
Thanks.

@Fabian, thanks for your explanation. That makes sense.

Best,
Xingcan

On Thu, Apr 13, 2017 at 2:41 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Xingcan,
>
> the 0L timestamp literal is an artifact of how the Calcite query is
> translated by Flink.
> It represents the value of the procTime() function that is logically used
> to sort the data. Calcite expects this attribute in the schema but Flink's
> OVER operator actually processes the data based on the local wallclock time
> of the operator.
>
> So this is an unnecessary overhead at the moment, which hopefully will be
> resolved before the 1.3 release.
>
> Best, Fabian
>
> 2017-04-12 9:45 GMT+02:00 Stefano Bortoli <st...@huawei.com>:
>
> > I'm afraid that to keep order either you have to process it in a serial
> > way (parallelism 1), or provide an element that allows to sort the
> objects
> > when these are processed in parallel (i.e. rowTime). When you distribute
> > the computation, as Fabian explained, you get a round-robin assignment to
> > the different process functions, which may not respect the original input
> > order in the output.
> >
> > ProcessTime means that you don't care much about time as a sorting
> > reference for the computation of the result.
> >
> > What Radu suggested is to inject the timestamp in your dataStream before
> > processing, and then use rowTime semantics. It won't be "real row time"
> > because your function will inject the timestamp of "arrival", but it will
> > produce sorted output as you "order by rowTime". Hope it helps.
> >
> > Best,
> > Stefano
> >
> > -----Original Message-----
> > From: Xingcan Cui [mailto:xingcanc@gmail.com]
> > Sent: Wednesday, April 12, 2017 8:11 AM
> > To: dev@flink.apache.org
> > Subject: Re: Question about the process order in stream aggregate
> >
> > Hi everybody,
> >
> > thank you all for your help.
> >
> > @Fabian I also check the DataStream that translated from the query and
> try
> > to figure out what happens in each step. The results are as follows
> > (correct me please if there's something wrong):
> >
> > Source -> Map (Order to Row3) -> FlatMap (do project and extract
> > timestamp?) -> Partition (partition by product) ->BoundedOverAggregate
> > (aggregate) -> FlatMap (Row5 to Row2) -> Sink
> >
> > @Stefano. It's indeed unable to keep the order unless we set parallelism
> > of the first MapFunc to 1 (which is unpractical) or execute the partition
> > step in advance (seems to be unpractical too).
> >
> > Anyway, the procTime itself is actually a "blurred concept" that full of
> > uncertainty, right? Now I think it's better to use rowTime instead if the
> > application need order preserving.
> >
> > @Radu, the assignTimestampsAndWatermarks method seems to be useless,
> maybe
> > it only affects the rowTime?
> >
> > There's another question. I find the following code in the generated
> > FlatMap function (step 3 project and extract timestamp):
> >
> > ...
> > java.sql.Timestamp result$16;
> > if (false) {
> >     result$16 = null;
> > }
> > else {
> >     result$16 =
> > org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L);
> > }
> >
> > if (false) {
> >     out.setField(2, null);
> > }
> > else {
> >     out.setField(2, result$16);
> > }
> > ...
> >
> > Could you please help me explain what's the 0L timestamp mean?
> >
> > Best,
> > Xingcan
> >
> > On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <ra...@huawei.com>
> > wrote:
> >
> > > Hi Xingcan,
> > >
> > > If you need to guarantee the order also in the case of procTime a
> > > trick that you can do is to set the working time of the env to
> > > processing time and to assign the proctime to the incoming stream. You
> > can do this via .
> > > assignTimestampsAndWatermarks(new ...) And override override def
> > > extractTimestamp(
> > >       element: type...,
> > >       previousElementTimestamp: Long): Long = {
> > >       System.currentTimeMillis()
> > >     }
> > >
> > > Alternatively you can play around with the stream source and control
> > > the time when the events come
> > >
> > > Dr. Radu Tudoran
> > > Senior Research Engineer - Big Data Expert IT R&D Division
> > >
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > German Research Center
> > > Munich Office
> > > Riesstrasse 25, 80992 München
> > >
> > > E-mail: radu.tudoran@huawei.com
> > > Mobile: +49 15209084330
> > > Telephone: +49 891588344173
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > This e-mail and its attachments contain confidential information from
> > > HUAWEI, which is intended only for the person or entity whose address
> is
> > > listed above. Any use of the information contained herein in any way
> > > (including, but not limited to, total or partial disclosure,
> > reproduction,
> > > or dissemination) by persons other than the intended recipient(s) is
> > > prohibited. If you receive this e-mail in error, please notify the
> sender
> > > by phone or email immediately and delete it!
> > >
> > >
> > > -----Original Message-----
> > > From: fhueske@gmail.com [mailto:fhueske@gmail.com]
> > > Sent: Tuesday, April 11, 2017 2:24 PM
> > > To: Stefano Bortoli; dev@flink.apache.org
> > > Subject: AW: Question about the process order in stream aggregate
> > >
> > > Resending to dev@f.a.o
> > >
> > > Hi Xingcan,
> > >
> > > This is expected behavior. In general, is not possible to guarantee
> > > results for processing time.
> > >
> > > Your query is translated as follows:
> > >
> > > CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n)
> > > -fwd-> MapFunc(n) -fwd-> Sink(n)
> > >
> > > The order of records is changed because of the connection between
> source
> > > and first map function. Here, records are distributed round robin to
> > > increase the parallelism from 1 to n. The parallel instances of map
> might
> > > forward the records in different order to the ProcessFunction that
> > computes
> > > the aggregation.
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > Von: Stefano Bortoli
> > > Gesendet: Dienstag, 11. April 2017 14:10
> > > An: dev@flink.apache.org
> > > Betreff: RE: Question about the process order in stream aggregate
> > >
> > > Hi Xingcan,
> > >
> > > Are you using parallelism 1 for the test?  procTime semantics deals
> with
> > > the objects as they loaded in the operators. It could be the
> co-occuring
> > > partitioned events (in the same MS time frame) are processed in
> parallel
> > > and then the output is produced in different order.
> > >
> > > I suggest you to have a look at the integration test to verify that the
> > > configuration of your experiment is correct.
> > >
> > > Best,
> > > Stefano
> > >
> > > -----Original Message-----
> > > From: Xingcan Cui [mailto:xingcanc@gmail.com]
> > > Sent: Tuesday, April 11, 2017 5:31 AM
> > > To: dev@flink.apache.org
> > > Subject: Question about the process order in stream aggregate
> > >
> > > Hi all,
> > >
> > > I run some tests for stream aggregation on rows. The data stream is
> > simply
> > > registered as
> > >
> > > val orderA: DataStream[Order] = env.fromCollection(Seq(
> > >       Order(1L, "beer", 1),
> > >       Order(2L, "diaper", 2),
> > >       Order(3L, "diaper", 3),
> > >       Order(4L, "rubber", 4)))
> > > tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
> > >
> > > and the SQL is defined as
> > >
> > > select product, sum(amount) over (partition by product order by
> > procTime()
> > > rows between unbounded preceding and current row from orderA).
> > >
> > > My expected output should be
> > >
> > > 2> Result(beer,1)
> > > 2> Result(diaper,2)
> > > 1> Result(rubber,4)
> > > 2> Result(diaper,5).
> > >
> > > However, sometimes I get the following output
> > >
> > > 2> Result(beer,1)
> > > 2> Result(diaper,3)
> > > 1> Result(rubber,4)
> > > 2> Result(diaper,5).
> > >
> > > It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper",
> > 3)"
> > > are out of order. Is that normal?
> > >
> > > BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the
> > > order for them can always be preserved.
> > >
> > > Thanks,
> > > Xingcan
> > >
> > >
> >
>

Re: Question about the process order in stream aggregate

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Xingcan,

the 0L timestamp literal is an artifact of how the Calcite query is
translated by Flink.
It represents the value of the procTime() function that is logically used
to sort the data. Calcite expects this attribute in the schema but Flink's
OVER operator actually processes the data based on the local wallclock time
of the operator.

So this is an unnecessary overhead at the moment, which hopefully will be
resolved before the 1.3 release.

Best, Fabian

2017-04-12 9:45 GMT+02:00 Stefano Bortoli <st...@huawei.com>:

> I'm afraid that to keep order either you have to process it in a serial
> way (parallelism 1), or provide an element that allows to sort the objects
> when these are processed in parallel (i.e. rowTime). When you distribute
> the computation, as Fabian explained, you get a round-robin assignment to
> the different process functions, which may not respect the original input
> order in the output.
>
> ProcessTime means that you don't care much about time as a sorting
> reference for the computation of the result.
>
> What Radu suggested is to inject the timestamp in your dataStream before
> processing, and then use rowTime semantics. It won't be "real row time"
> because your function will inject the timestamp of "arrival", but it will
> produce sorted output as you "order by rowTime". Hope it helps.
>
> Best,
> Stefano
>
> -----Original Message-----
> From: Xingcan Cui [mailto:xingcanc@gmail.com]
> Sent: Wednesday, April 12, 2017 8:11 AM
> To: dev@flink.apache.org
> Subject: Re: Question about the process order in stream aggregate
>
> Hi everybody,
>
> thank you all for your help.
>
> @Fabian I also check the DataStream that translated from the query and try
> to figure out what happens in each step. The results are as follows
> (correct me please if there's something wrong):
>
> Source -> Map (Order to Row3) -> FlatMap (do project and extract
> timestamp?) -> Partition (partition by product) ->BoundedOverAggregate
> (aggregate) -> FlatMap (Row5 to Row2) -> Sink
>
> @Stefano. It's indeed unable to keep the order unless we set parallelism
> of the first MapFunc to 1 (which is unpractical) or execute the partition
> step in advance (seems to be unpractical too).
>
> Anyway, the procTime itself is actually a "blurred concept" that full of
> uncertainty, right? Now I think it's better to use rowTime instead if the
> application need order preserving.
>
> @Radu, the assignTimestampsAndWatermarks method seems to be useless, maybe
> it only affects the rowTime?
>
> There's another question. I find the following code in the generated
> FlatMap function (step 3 project and extract timestamp):
>
> ...
> java.sql.Timestamp result$16;
> if (false) {
>     result$16 = null;
> }
> else {
>     result$16 =
> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L);
> }
>
> if (false) {
>     out.setField(2, null);
> }
> else {
>     out.setField(2, result$16);
> }
> ...
>
> Could you please help me explain what's the 0L timestamp mean?
>
> Best,
> Xingcan
>
> On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <ra...@huawei.com>
> wrote:
>
> > Hi Xingcan,
> >
> > If you need to guarantee the order also in the case of procTime a
> > trick that you can do is to set the working time of the env to
> > processing time and to assign the proctime to the incoming stream. You
> can do this via .
> > assignTimestampsAndWatermarks(new ...) And override override def
> > extractTimestamp(
> >       element: type...,
> >       previousElementTimestamp: Long): Long = {
> >       System.currentTimeMillis()
> >     }
> >
> > Alternatively you can play around with the stream source and control
> > the time when the events come
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > German Research Center
> > Munich Office
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudoran@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: fhueske@gmail.com [mailto:fhueske@gmail.com]
> > Sent: Tuesday, April 11, 2017 2:24 PM
> > To: Stefano Bortoli; dev@flink.apache.org
> > Subject: AW: Question about the process order in stream aggregate
> >
> > Resending to dev@f.a.o
> >
> > Hi Xingcan,
> >
> > This is expected behavior. In general, is not possible to guarantee
> > results for processing time.
> >
> > Your query is translated as follows:
> >
> > CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n)
> > -fwd-> MapFunc(n) -fwd-> Sink(n)
> >
> > The order of records is changed because of the connection between source
> > and first map function. Here, records are distributed round robin to
> > increase the parallelism from 1 to n. The parallel instances of map might
> > forward the records in different order to the ProcessFunction that
> computes
> > the aggregation.
> >
> > Hope this helps,
> > Fabian
> >
> >
> > Von: Stefano Bortoli
> > Gesendet: Dienstag, 11. April 2017 14:10
> > An: dev@flink.apache.org
> > Betreff: RE: Question about the process order in stream aggregate
> >
> > Hi Xingcan,
> >
> > Are you using parallelism 1 for the test?  procTime semantics deals with
> > the objects as they loaded in the operators. It could be the co-occuring
> > partitioned events (in the same MS time frame) are processed in parallel
> > and then the output is produced in different order.
> >
> > I suggest you to have a look at the integration test to verify that the
> > configuration of your experiment is correct.
> >
> > Best,
> > Stefano
> >
> > -----Original Message-----
> > From: Xingcan Cui [mailto:xingcanc@gmail.com]
> > Sent: Tuesday, April 11, 2017 5:31 AM
> > To: dev@flink.apache.org
> > Subject: Question about the process order in stream aggregate
> >
> > Hi all,
> >
> > I run some tests for stream aggregation on rows. The data stream is
> simply
> > registered as
> >
> > val orderA: DataStream[Order] = env.fromCollection(Seq(
> >       Order(1L, "beer", 1),
> >       Order(2L, "diaper", 2),
> >       Order(3L, "diaper", 3),
> >       Order(4L, "rubber", 4)))
> > tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
> >
> > and the SQL is defined as
> >
> > select product, sum(amount) over (partition by product order by
> procTime()
> > rows between unbounded preceding and current row from orderA).
> >
> > My expected output should be
> >
> > 2> Result(beer,1)
> > 2> Result(diaper,2)
> > 1> Result(rubber,4)
> > 2> Result(diaper,5).
> >
> > However, sometimes I get the following output
> >
> > 2> Result(beer,1)
> > 2> Result(diaper,3)
> > 1> Result(rubber,4)
> > 2> Result(diaper,5).
> >
> > It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper",
> 3)"
> > are out of order. Is that normal?
> >
> > BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the
> > order for them can always be preserved.
> >
> > Thanks,
> > Xingcan
> >
> >
>

RE: Question about the process order in stream aggregate

Posted by Stefano Bortoli <st...@huawei.com>.
I'm afraid that to keep order either you have to process it in a serial way (parallelism 1), or provide an element that allows to sort the objects when these are processed in parallel (i.e. rowTime). When you distribute the computation, as Fabian explained, you get a round-robin assignment to the different process functions, which may not respect the original input order in the output.

ProcessTime means that you don't care much about time as a sorting reference for the computation of the result. 

What Radu suggested is to inject the timestamp in your dataStream before processing, and then use rowTime semantics. It won't be "real row time" because your function will inject the timestamp of "arrival", but it will produce sorted output as you "order by rowTime". Hope it helps.

Best,
Stefano

-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Wednesday, April 12, 2017 8:11 AM
To: dev@flink.apache.org
Subject: Re: Question about the process order in stream aggregate

Hi everybody,

thank you all for your help.

@Fabian I also check the DataStream that translated from the query and try to figure out what happens in each step. The results are as follows (correct me please if there's something wrong):

Source -> Map (Order to Row3) -> FlatMap (do project and extract
timestamp?) -> Partition (partition by product) ->BoundedOverAggregate
(aggregate) -> FlatMap (Row5 to Row2) -> Sink

@Stefano. It's indeed unable to keep the order unless we set parallelism of the first MapFunc to 1 (which is unpractical) or execute the partition step in advance (seems to be unpractical too).

Anyway, the procTime itself is actually a "blurred concept" that full of uncertainty, right? Now I think it's better to use rowTime instead if the application need order preserving.

@Radu, the assignTimestampsAndWatermarks method seems to be useless, maybe it only affects the rowTime?

There's another question. I find the following code in the generated FlatMap function (step 3 project and extract timestamp):

...
java.sql.Timestamp result$16;
if (false) {
    result$16 = null;
}
else {
    result$16 =
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L);
}

if (false) {
    out.setField(2, null);
}
else {
    out.setField(2, result$16);
}
...

Could you please help me explain what's the 0L timestamp mean?

Best,
Xingcan

On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <ra...@huawei.com>
wrote:

> Hi Xingcan,
>
> If you need to guarantee the order also in the case of procTime a 
> trick that you can do is to set the working time of the env to 
> processing time and to assign the proctime to the incoming stream. You can do this via .
> assignTimestampsAndWatermarks(new ...) And override override def 
> extractTimestamp(
>       element: type...,
>       previousElementTimestamp: Long): Long = {
>       System.currentTimeMillis()
>     }
>
> Alternatively you can play around with the stream source and control 
> the time when the events come
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudoran@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: fhueske@gmail.com [mailto:fhueske@gmail.com]
> Sent: Tuesday, April 11, 2017 2:24 PM
> To: Stefano Bortoli; dev@flink.apache.org
> Subject: AW: Question about the process order in stream aggregate
>
> Resending to dev@f.a.o
>
> Hi Xingcan,
>
> This is expected behavior. In general, is not possible to guarantee
> results for processing time.
>
> Your query is translated as follows:
>
> CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n)
> -fwd-> MapFunc(n) -fwd-> Sink(n)
>
> The order of records is changed because of the connection between source
> and first map function. Here, records are distributed round robin to
> increase the parallelism from 1 to n. The parallel instances of map might
> forward the records in different order to the ProcessFunction that computes
> the aggregation.
>
> Hope this helps,
> Fabian
>
>
> Von: Stefano Bortoli
> Gesendet: Dienstag, 11. April 2017 14:10
> An: dev@flink.apache.org
> Betreff: RE: Question about the process order in stream aggregate
>
> Hi Xingcan,
>
> Are you using parallelism 1 for the test?  procTime semantics deals with
> the objects as they loaded in the operators. It could be the co-occuring
> partitioned events (in the same MS time frame) are processed in parallel
> and then the output is produced in different order.
>
> I suggest you to have a look at the integration test to verify that the
> configuration of your experiment is correct.
>
> Best,
> Stefano
>
> -----Original Message-----
> From: Xingcan Cui [mailto:xingcanc@gmail.com]
> Sent: Tuesday, April 11, 2017 5:31 AM
> To: dev@flink.apache.org
> Subject: Question about the process order in stream aggregate
>
> Hi all,
>
> I run some tests for stream aggregation on rows. The data stream is simply
> registered as
>
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>       Order(1L, "beer", 1),
>       Order(2L, "diaper", 2),
>       Order(3L, "diaper", 3),
>       Order(4L, "rubber", 4)))
> tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
>
> and the SQL is defined as
>
> select product, sum(amount) over (partition by product order by procTime()
> rows between unbounded preceding and current row from orderA).
>
> My expected output should be
>
> 2> Result(beer,1)
> 2> Result(diaper,2)
> 1> Result(rubber,4)
> 2> Result(diaper,5).
>
> However, sometimes I get the following output
>
> 2> Result(beer,1)
> 2> Result(diaper,3)
> 1> Result(rubber,4)
> 2> Result(diaper,5).
>
> It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
> are out of order. Is that normal?
>
> BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the
> order for them can always be preserved.
>
> Thanks,
> Xingcan
>
>

Re: Question about the process order in stream aggregate

Posted by Xingcan Cui <xi...@gmail.com>.
Hi everybody,

thank you all for your help.

@Fabian I also check the DataStream that translated from the query and try
to figure out what happens in each step. The results are as follows
(correct me please if there's something wrong):

Source -> Map (Order to Row3) -> FlatMap (do project and extract
timestamp?) -> Partition (partition by product) ->BoundedOverAggregate
(aggregate) -> FlatMap (Row5 to Row2) -> Sink

@Stefano. It's indeed unable to keep the order unless we set parallelism of
the first MapFunc to 1 (which is unpractical) or execute the partition step
in advance (seems to be unpractical too).

Anyway, the procTime itself is actually a "blurred concept" that full of
uncertainty, right? Now I think it's better to use rowTime instead if the
application need order preserving.

@Radu, the assignTimestampsAndWatermarks method seems to be useless, maybe
it only affects the rowTime?

There's another question. I find the following code in the generated
FlatMap function (step 3 project and extract timestamp):

...
java.sql.Timestamp result$16;
if (false) {
    result$16 = null;
}
else {
    result$16 =
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(0L);
}

if (false) {
    out.setField(2, null);
}
else {
    out.setField(2, result$16);
}
...

Could you please help me explain what's the 0L timestamp mean?

Best,
Xingcan

On Tue, Apr 11, 2017 at 8:40 PM, Radu Tudoran <ra...@huawei.com>
wrote:

> Hi Xingcan,
>
> If you need to guarantee the order also in the case of procTime a trick
> that you can do is to set the working time of the env to processing time
> and to assign the proctime to the incoming stream. You can do this via .
> assignTimestampsAndWatermarks(new ...)
> And override
> override def extractTimestamp(
>       element: type...,
>       previousElementTimestamp: Long): Long = {
>       System.currentTimeMillis()
>     }
>
> Alternatively you can play around with the stream source and control the
> time when the events come
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudoran@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: fhueske@gmail.com [mailto:fhueske@gmail.com]
> Sent: Tuesday, April 11, 2017 2:24 PM
> To: Stefano Bortoli; dev@flink.apache.org
> Subject: AW: Question about the process order in stream aggregate
>
> Resending to dev@f.a.o
>
> Hi Xingcan,
>
> This is expected behavior. In general, is not possible to guarantee
> results for processing time.
>
> Your query is translated as follows:
>
> CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n)
> -fwd-> MapFunc(n) -fwd-> Sink(n)
>
> The order of records is changed because of the connection between source
> and first map function. Here, records are distributed round robin to
> increase the parallelism from 1 to n. The parallel instances of map might
> forward the records in different order to the ProcessFunction that computes
> the aggregation.
>
> Hope this helps,
> Fabian
>
>
> Von: Stefano Bortoli
> Gesendet: Dienstag, 11. April 2017 14:10
> An: dev@flink.apache.org
> Betreff: RE: Question about the process order in stream aggregate
>
> Hi Xingcan,
>
> Are you using parallelism 1 for the test?  procTime semantics deals with
> the objects as they loaded in the operators. It could be the co-occuring
> partitioned events (in the same MS time frame) are processed in parallel
> and then the output is produced in different order.
>
> I suggest you to have a look at the integration test to verify that the
> configuration of your experiment is correct.
>
> Best,
> Stefano
>
> -----Original Message-----
> From: Xingcan Cui [mailto:xingcanc@gmail.com]
> Sent: Tuesday, April 11, 2017 5:31 AM
> To: dev@flink.apache.org
> Subject: Question about the process order in stream aggregate
>
> Hi all,
>
> I run some tests for stream aggregation on rows. The data stream is simply
> registered as
>
> val orderA: DataStream[Order] = env.fromCollection(Seq(
>       Order(1L, "beer", 1),
>       Order(2L, "diaper", 2),
>       Order(3L, "diaper", 3),
>       Order(4L, "rubber", 4)))
> tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
>
> and the SQL is defined as
>
> select product, sum(amount) over (partition by product order by procTime()
> rows between unbounded preceding and current row from orderA).
>
> My expected output should be
>
> 2> Result(beer,1)
> 2> Result(diaper,2)
> 1> Result(rubber,4)
> 2> Result(diaper,5).
>
> However, sometimes I get the following output
>
> 2> Result(beer,1)
> 2> Result(diaper,3)
> 1> Result(rubber,4)
> 2> Result(diaper,5).
>
> It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
> are out of order. Is that normal?
>
> BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the
> order for them can always be preserved.
>
> Thanks,
> Xingcan
>
>

RE: Question about the process order in stream aggregate

Posted by Radu Tudoran <ra...@huawei.com>.
Hi Xingcan,

If you need to guarantee the order also in the case of procTime a trick that you can do is to set the working time of the env to processing time and to assign the proctime to the incoming stream. You can do this via .assignTimestampsAndWatermarks(new ...)
And override 
override def extractTimestamp(
      element: type...,
      previousElementTimestamp: Long): Long = {
      System.currentTimeMillis()
    }

Alternatively you can play around with the stream source and control the time when the events come

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudoran@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: fhueske@gmail.com [mailto:fhueske@gmail.com] 
Sent: Tuesday, April 11, 2017 2:24 PM
To: Stefano Bortoli; dev@flink.apache.org
Subject: AW: Question about the process order in stream aggregate

Resending to dev@f.a.o

Hi Xingcan,

This is expected behavior. In general, is not possible to guarantee results for processing time.

Your query is translated as follows:

CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd-> MapFunc(n) -fwd-> Sink(n)

The order of records is changed because of the connection between source and first map function. Here, records are distributed round robin to increase the parallelism from 1 to n. The parallel instances of map might forward the records in different order to the ProcessFunction that computes the aggregation. 

Hope this helps,
Fabian


Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: dev@flink.apache.org
Betreff: RE: Question about the process order in stream aggregate

Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the objects as they loaded in the operators. It could be the co-occuring partitioned events (in the same MS time frame) are processed in parallel and then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the configuration of your experiment is correct.

Best,
Stefano

-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can always be preserved.

Thanks,
Xingcan


AW: Question about the process order in stream aggregate

Posted by fh...@gmail.com.
Resending to dev@f.a.o

Hi Xingcan,

This is expected behavior. In general, is not possible to guarantee results for processing time.

Your query is translated as follows:

CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd-> MapFunc(n) -fwd-> Sink(n)

The order of records is changed because of the connection between source and first map function. Here, records are distributed round robin to increase the parallelism from 1 to n. The parallel instances of map might forward the records in different order to the ProcessFunction that computes the aggregation. 

Hope this helps,
Fabian


Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: dev@flink.apache.org
Betreff: RE: Question about the process order in stream aggregate

Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the objects as they loaded in the operators. It could be the co-occuring partitioned events (in the same MS time frame) are processed in parallel and then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the configuration of your experiment is correct.

Best,
Stefano

-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can always be preserved.

Thanks,
Xingcan


RE: Question about the process order in stream aggregate

Posted by Stefano Bortoli <st...@huawei.com>.
Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the objects as they loaded in the operators. It could be the co-occuring partitioned events (in the same MS time frame) are processed in parallel and then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the configuration of your experiment is correct.

Best,
Stefano

-----Original Message-----
From: Xingcan Cui [mailto:xingcanc@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for them can always be preserved.

Thanks,
Xingcan