You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2018/10/01 07:54:13 UTC

Re: About the retract of the calculation result of flink sql

Hi,

you also need to keep the parallelism in mind. If your downstream 
operator or sink has a parallelism of 1 and your SQL query pipeline has 
a higher parallelism, the retract results are rebalanced and arrive in a 
wrong order. For example, if you view the changelog in SQL Client, the 
built-in SQL Client sink has always parallelism 1.

Regards,
Timo



Am 29.09.18 um 17:02 schrieb Hequn Cheng:
> Hi clay,
>
> Are there any other lines after the last line in your picture? The 
> final result should be eventual consistency and correct.
>
> In your sql, there is a left join, a keyed group by and a non-keyed 
> group by. Both of the left join and keyed group by will send 
> retractions to the downstream non-keyed group by once there is an 
> update. The retraction messages vibrate the result value. However, the 
> final result will be correct.
> To get monotonous results, you can add another non-keyed group by with 
> max.
>
> Best, Hequn.
>
>
> On Sat, Sep 29, 2018 at 3:47 PM clay4444 <clay4megtr@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     My final calculation result is implemented in the following way
>     when writing
>     to kafka, because KafkaTableSink does not support retract mode, I
>     am not
>     sure whether this method will affect the calculation result.
>
>     val userTest: Table = tEnv.sqlQuery(sql)
>
>     val endStream = tEnv.toRetractStream[Row](userTest)
>
>     //userTest.insertInto("kafkaSink")
>
>     val myProducer = new FlinkKafkaProducer011[String](
>       kafkaBrokers,         // broker list
>       topic,   // target topic
>       new SimpleStringSchema)   // serialization schema
>
>     endStream.map(x=>{
>       s"${x._1}:${x._2.toString}"
>     }).addSink(myProducer)
>
>
>
>     --
>     Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: About the retract of the calculation result of flink sql

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Clay,

If you do env.setParallelism(1), the query won't be executed in parallel.
However, looking at your screenshot the message order does not seem to be
the problem here (given that you printed the content of the topic).

Are you sure that it is not possible that the result decreases if some rows
are added to one of the input tables?
I don't have time to dig into your query, but the HAVING clause or the left
join and (u.id is null) predicate look a bit suspicious to me.

Would it be possible to create a minimal example that reproduces the issue?

Best, Fabian

Am Mo., 1. Okt. 2018 um 15:11 Uhr schrieb clay4444 <cl...@gmail.com>:

> hi,Timo
>
> I use env.setParallelism(1) in my code, I set the overall degree of
> parallelism of the program to 1, so that some calculations will still be
> parallelized?
>
> clay,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: About the retract of the calculation result of flink sql

Posted by Hequn Cheng <ch...@gmail.com>.
Hi clay,

Keyed group by:

> SELECT a, SUM(b) as d
> FROM Orders
> GROUP BY a


Non Keyed group by:

> SELECT SUM(b) as d
> FROM Orders


I would like to look into the problem. However, I can't find obvious
problems from the sql. It would be great that can provide a minimal example
to reproduce the issue. Also, use print sink to avoid sinking into multi
kafka partitions, since it will also bring out of order problem.

Best, Hequn

On Mon, Oct 1, 2018 at 9:11 PM clay4444 <cl...@gmail.com> wrote:

> hi,Timo
>
> I use env.setParallelism(1) in my code, I set the overall degree of
> parallelism of the program to 1, so that some calculations will still be
> parallelized?
>
> clay,
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: About the retract of the calculation result of flink sql

Posted by clay4444 <cl...@gmail.com>.
hi,Timo

I use env.setParallelism(1) in my code, I set the overall degree of
parallelism of the program to 1, so that some calculations will still be
parallelized?

clay,




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/