You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by clay4444 <cl...@gmail.com> on 2018/12/03 15:15:12 UTC

If you are an expert in flink sql, then I really need your help...

    I am using flink sql to do some complicated calculations. I have
encountered some very difficult problems in this process, so I would like to
ask everyone for your help. My goal is to build a data stream with a very
accurate result, which is also in line with the Streaming System. The core
idea of ​​this book is also what I have to do. I use kafka to receive the
mysql binlog as the data source, then join into multiple tables, and then
perform complex sql calculations on these multiple tables. I found that
flink does not provide upsert. Implementation, so I added a last_value(xxx),
last_value(xxx)..group by(id) operation for each kafka data source to ensure
consistency of the final result, which works, I understand this Will cache a
dynamic table, resulting in a large state (about 3 G), but seems to
introduce some other very strange problems, summarized as follows:

1. In the case of sql is very complicated, it is clear that checkpoint is
turned on, but the web interface finds that there is no checkpoint at all,
and none of them
2. During the running of the program, it frequently hangs. The error has
always been the following error:

(1) the assigned slot id_xxxxx was removed
(2) the heartbeat with taskmanager was timeout

I have used slotSharingGroup to split tasks into different slots whenever
possible, but I still often report these two errors, causing the program to
hang.

I have no clue about these mistakes. If anyone can help, I really appreciate
it.

Added: I receive data from 4 kafka topics, the maximum amount of data is
more than 20 million.
My startup command is as follows
 
Flink1.6/bin/flink run -m yarn-cluster -ytm 23240 -yn 3 -ys 2 -ynm xxxx -yqu
xxxx -c xxxxxxx xxx.jar ./test.conf



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: If you are an expert in flink sql, then I really need your help...

Posted by Timo Walther <tw...@apache.org>.
Unfortunately, setting the parallelism per SQL operator is not supported 
right now.


We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while



Am 04.12.18 um 13:05 schrieb clay4444:
> hi Timo:
>
> first very thank u, I have solve the ploblems,
>
> Regarding the problem of too large state, I set the global parallelism to 7
> for the program, which solved my problem very well, checkpoint is very fast,
> but I would like to ask if there is a way to set parallelism for each
> operator(translated from sql statement) instead of global settings?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: If you are an expert in flink sql, then I really need your help...

Posted by clay4444 <cl...@gmail.com>.
hi Timo: 

first very thank u, I have solve the ploblems,

Regarding the problem of too large state, I set the global parallelism to 7
for the program, which solved my problem very well, checkpoint is very fast,
but I would like to ask if there is a way to set parallelism for each
operator(translated from sql statement) instead of global settings?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: If you are an expert in flink sql, then I really need your help...

Posted by Timo Walther <tw...@apache.org>.
Hi,

yes this was a unintended behavior that got fixed in Flink 1.7.

See https://issues.apache.org/jira/browse/FLINK-10474

Regards,
Timo


Am 04.12.18 um 05:21 schrieb clay4444:
> I have found out that checkpoint is not triggered. Regarding the in
> operation in flink sql, this sql will trigger checkpoint normally.
>
> select name,age from user where id in
> (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840)
>
> This sql will not trigger
>
> (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456)
>
> is this a bug?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: If you are an expert in flink sql, then I really need your help...

Posted by clay4444 <cl...@gmail.com>.
I have found out that checkpoint is not triggered. Regarding the in
operation in flink sql, this sql will trigger checkpoint normally.

select name,age from user where id in
(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840)

This sql will not trigger

(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456)

is this a bug? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: If you are an expert in flink sql, then I really need your help...

Posted by clay4444 <cl...@gmail.com>.
hi Timo:

The LAST_VALUE function simply groups by id and then takes the latest row of
data for each primary key. I was inspired by this answer:
https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables

Its implementation is also very simple:

class Middle2 extends Serializable{
  private val serialVersionUID = 3L
  var mid:String = "none"
}

class StringLastValueFunc extends AggregateFunction[JString, Middle2] {

  override def createAccumulator(): Middle2 = {
    new Middle2
  }

  def accumulate(acc: Middle2, iValue: String): Unit = {
    if(iValue != null && iValue.toString != ""){
      acc.mid = iValue
    }
  }

  override def getValue(acc: Middle2): JString = {
    acc.mid
  }

  override def getResultType: TypeInformation[JString] = Types.STRING
}


and I don't think I should set the state expiration time because the data
for each primary key changes at any time.

I have used the rocksdb backend and set the incremental checkpoint.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: If you are an expert in flink sql, then I really need your help...

Posted by Timo Walther <tw...@apache.org>.
Hi,

it is very difficult to spot the problem with the little information you 
gave us.

Maybe you can show us a simplified SQL query and the implementation of 
the `LAST_VALUE` function?

An initial guess would be that you are running out of memory such that 
YARN kills your task manager. If you are sure that you state size 
remains constant, you could also try to use a different state backend 
that spills to disk. Have you tried out the RocksDB state backend?

Did you configure a state rentention time?

Regards,
Timo


Am 03.12.18 um 16:15 schrieb clay4444:
>      I am using flink sql to do some complicated calculations. I have
> encountered some very difficult problems in this process, so I would like to
> ask everyone for your help. My goal is to build a data stream with a very
> accurate result, which is also in line with the Streaming System. The core
> idea of ​​this book is also what I have to do. I use kafka to receive the
> mysql binlog as the data source, then join into multiple tables, and then
> perform complex sql calculations on these multiple tables. I found that
> flink does not provide upsert. Implementation, so I added a last_value(xxx),
> last_value(xxx)..group by(id) operation for each kafka data source to ensure
> consistency of the final result, which works, I understand this Will cache a
> dynamic table, resulting in a large state (about 3 G), but seems to
> introduce some other very strange problems, summarized as follows:
>
> 1. In the case of sql is very complicated, it is clear that checkpoint is
> turned on, but the web interface finds that there is no checkpoint at all,
> and none of them
> 2. During the running of the program, it frequently hangs. The error has
> always been the following error:
>
> (1) the assigned slot id_xxxxx was removed
> (2) the heartbeat with taskmanager was timeout
>
> I have used slotSharingGroup to split tasks into different slots whenever
> possible, but I still often report these two errors, causing the program to
> hang.
>
> I have no clue about these mistakes. If anyone can help, I really appreciate
> it.
>
> Added: I receive data from 4 kafka topics, the maximum amount of data is
> more than 20 million.
> My startup command is as follows
>   
> Flink1.6/bin/flink run -m yarn-cluster -ytm 23240 -yn 3 -ys 2 -ynm xxxx -yqu
> xxxx -c xxxxxxx xxx.jar ./test.conf
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/