You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Eva Eva <et...@gmail.com> on 2020/01/10 02:29:53 UTC

Please suggest helpful tools

Hi,

I'm running Flink job on 1.9 version with blink planner.

My checkpoints are timing out intermittently, but as state grows they are
timing out more and more often eventually killing the job.

Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
is accumulated due to prior failed ones), Average=8.44GB.

Although size is huge, I have enough space on EC2 instance in which I'm
running job. I'm using RocksDB for checkpointing.

*Logs does not have any useful information to understand why checkpoints
are expiring/failing, can someone please point me to tools that can be used
to investigate and understand why checkpoints are failing.*

Also any other related suggestions are welcome.


Thanks,
Reva.

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
No, the job did not fail. I attached this sample screenshot to show how
checkpoints are often failing after a certain point.
This particular job is run with only ~4Million records so it ran fine and
did not fail. Its been up for Duration:14h 58m 45s. It just stopped/waiting
at 4,087,756 because there are no more incoming messages from Kafka source
atm.

[image: image.png]


Not sure if this is how it works, but the job keeps checkpointing even when
there are no new messages to process. Below is a checkpoints screenshot.

[image: image.png]

Thanks,
Eva


On Fri, Jan 17, 2020 at 12:26 AM Kurt Young <yk...@gmail.com> wrote:

> Looks like your job failed, some exception stacks would help.
>
> Best,
> Kurt
>
>
> On Fri, Jan 17, 2020 at 10:03 AM RKandoji <rk...@gmail.com> wrote:
>
>> Hi Kurt,
>>
>> I'm able to see significant improvement after fixing data skew (by
>> replacing NULLs with random generated value). It is processing lot more
>> records than earlier. Earlier checkpoint expiring issue caused job to hang
>> at ~2Million records, now after data skew fix its hanging after processing
>> ~4Million records.
>>
>> I see there is still HIGH backpressure for some operations, using UI I'm
>> able to see which operations and which subtasks are not able to reach
>> "100% acknowledged" status but I'm not sure what to investigate next to fix
>> it? Could you please suggest what else should I be checking or trying out.
>>
>> [image: image.png]
>>
>> Thanks,
>> Eva
>>
>>
>> On Thu, Jan 16, 2020 at 8:37 PM Eva Eva <et...@gmail.com>
>> wrote:
>>
>>> ok, I'll give it a try and let you know how it goes. atm I'm using
>>> random generated values to replace null and its helping with the data skew.
>>>
>>> Thanks,
>>> Eva
>>>
>>>
>>> On Thu, Jan 16, 2020 at 2:02 AM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> I just realized that it's invalid to filter null values in left outer
>>>> join like this:
>>>>
>>>> select * from L left outer join R on L.k1 = R.k1 AND L.k1 is not null
>>>>
>>>> You will still receive all records from L table even if L.k1 equals
>>>> null.
>>>> The valid way should be use a inner table to first filter out all null
>>>> values and then
>>>> do left outer join, like this:
>>>>
>>>> select * from
>>>>     (select * from L where L.k1 is not null) filteredL
>>>>     left outer join R
>>>>     on filteredL.k1 = R.k1
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Jan 16, 2020 at 4:19 AM Eva Eva <et...@gmail.com>
>>>> wrote:
>>>>
>>>>> You are correct, there is a difference in the query. Below are the
>>>>> queries with WHERE clause:
>>>>>
>>>>> Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
>>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])
>>>>>
>>>>>
>>>>> Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND
>>>>> $f16)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, $f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num], leftInputSpec=[HasUniqueKey],
>>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>>>> row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])
>>>>>
>>>>>
>>>>> Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
>>>>> $f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
>>>>> ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
>>>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>>> ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
>>>>> NULL AS $f32])
>>>>>
>>>>>
>>>>> Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
>>>>> $f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>>> ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
>>>>> ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
>>>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
>>>>> coBuyerAgentKeyL IS NOT NULL AS $f40])
>>>>>
>>>>>
>>>>> Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
>>>>> $f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
>>>>> ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
>>>>> row_num], leftInputSpec=[HasUniqueKey],
>>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>>>> row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
>>>>> proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
>>>>> ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
>>>>> ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
>>>>> listOfficeKeyL IS NOT NULL AS $f48])
>>>>>
>>>>> 1. So the number of records received will remain same for both with
>>>>> and without NOT NULL filter. For the scenario with NOT NULL filter, there
>>>>> is an extra field indicating whether the JOIN key is NULL or not.
>>>>> 2. If the above #1 point is correct, then a qq, would this extra field
>>>>> (indicating whether the JOIN key is NULL or not) help in avoiding
>>>>> unnecessary processing for the cases when the JOIN key is null?
>>>>> 3. If the above #2 point is correct, then I should have optimal
>>>>> performance even though there is a data skew.
>>>>>
>>>>> Please let me know your thoughts?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Eva
>>>>>
>>>>> On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:
>>>>>
>>>>>> > but I have noticed there is no change in the list of operations
>>>>>> with or without NOT NULL filter for JOIN keys.
>>>>>>
>>>>>> I noticed that the "where" condition in these two plans are
>>>>>> different. The without NOT NULL only has "listAgentKeyL = ucPKA" but the
>>>>>> with NOT NULL
>>>>>> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the
>>>>>> display. Could you show the full name of operators for the version which
>>>>>> with NOT NULL?
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <
>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kurt,
>>>>>>>
>>>>>>> Below are the three scenarios that I tested:
>>>>>>>
>>>>>>> 1. Without "NOT NULL" filter for the join key.
>>>>>>> Observation: data skew exists, as expected.
>>>>>>> Sample query:
>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>
>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> 2. With "NOT NULL" filter for the join key.
>>>>>>> Observation: data skew exists, same as above case 1
>>>>>>> Sample query:
>>>>>>>
>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>>
>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>>
>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> 3. I replaced all NULL values for Join keys with randomly generated
>>>>>>> values.
>>>>>>> Observation: data skew gone, absence of NULL values resulted in
>>>>>>> good data distribution
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>>
>>>>>>> I have pasted pictures to show the number of records and the list of
>>>>>>> operations.
>>>>>>> Not sure if I understood your question correctly, but I have noticed
>>>>>>> there is no change in the list of operations with or without NOT NULL
>>>>>>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>>>>>>> DeDuplication, without any other operation in between.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eva
>>>>>>>
>>>>>>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>>
>>>>>>>> First could you check whether the added filter conditions are
>>>>>>>> executed before join operators? If they are
>>>>>>>> already pushed down and executed before join, it's should be some
>>>>>>>> real join keys generating data skew.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <
>>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Kurt,
>>>>>>>>>
>>>>>>>>> Assuming I'm joining two tables, "latestListings" and
>>>>>>>>> "latestAgents" like below:
>>>>>>>>>
>>>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>>>>>>> change the query as below:
>>>>>>>>>
>>>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>>>>
>>>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>>>>
>>>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Eva
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> You can try to filter NULL values with an explicit condition like
>>>>>>>>>> "xxxx is not NULL".
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Kurt
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <
>>>>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you both for the suggestions.
>>>>>>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>>>>>>> take it from there.
>>>>>>>>>>>
>>>>>>>>>>> *Problem that I identified:*
>>>>>>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the
>>>>>>>>>>> same operation I also noticed that the load is distributed poorly with
>>>>>>>>>>> heavy load being fed to SubTask:21.
>>>>>>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>>>>>>> operation and being put into the same task.
>>>>>>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>>>>>>> checkpointing issue.
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the guidance.
>>>>>>>>>>> Eva.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <
>>>>>>>>>>> qcx978132955@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>> For expired checkpoint, you can find something like "
>>>>>>>>>>>> Checkpoint xxx of job xx expired before completing" in jobmanager.log, then
>>>>>>>>>>>> you can go to the checkpoint UI to find which tasks did not ack, and go to
>>>>>>>>>>>> these tasks to see what happened.
>>>>>>>>>>>>
>>>>>>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>>>>>>> checkpoint failed.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Congxian
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Eva
>>>>>>>>>>>>>
>>>>>>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log
>>>>>>>>>>>>> to see why checkpoint failed, might be declined by some specific task.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If checkpoint expired, you can also access the web UI to see
>>>>>>>>>>>>> which tasks did not respond in time, some hot task might not be able to
>>>>>>>>>>>>> respond in time. Generally speaking, checkpoint expired is mostly caused by
>>>>>>>>>>>>> back pressure which led the checkpoint barrier did not arrive in time.
>>>>>>>>>>>>> Resolve the back pressure could help the checkpoint finished before timeout.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and
>>>>>>>>>>>>> back pressure [2] could help you.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best
>>>>>>>>>>>>> Yun Tang
>>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My checkpoints are timing out intermittently, but as state
>>>>>>>>>>>>> grows they are timing out more and more often eventually killing the job.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Size of the state is large with Minimum=10.2MB and
>>>>>>>>>>>>> Maximum=49GB (this one is accumulated due to prior failed ones),
>>>>>>>>>>>>> Average=8.44GB.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Although size is huge, I have enough space on EC2 instance in
>>>>>>>>>>>>> which I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Reva.
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Please suggest helpful tools

Posted by Kurt Young <yk...@gmail.com>.
Looks like your job failed, some exception stacks would help.

Best,
Kurt


On Fri, Jan 17, 2020 at 10:03 AM RKandoji <rk...@gmail.com> wrote:

> Hi Kurt,
>
> I'm able to see significant improvement after fixing data skew (by
> replacing NULLs with random generated value). It is processing lot more
> records than earlier. Earlier checkpoint expiring issue caused job to hang
> at ~2Million records, now after data skew fix its hanging after processing
> ~4Million records.
>
> I see there is still HIGH backpressure for some operations, using UI I'm
> able to see which operations and which subtasks are not able to reach
> "100% acknowledged" status but I'm not sure what to investigate next to fix
> it? Could you please suggest what else should I be checking or trying out.
>
> [image: image.png]
>
> Thanks,
> Eva
>
>
> On Thu, Jan 16, 2020 at 8:37 PM Eva Eva <et...@gmail.com>
> wrote:
>
>> ok, I'll give it a try and let you know how it goes. atm I'm using random
>> generated values to replace null and its helping with the data skew.
>>
>> Thanks,
>> Eva
>>
>>
>> On Thu, Jan 16, 2020 at 2:02 AM Kurt Young <yk...@gmail.com> wrote:
>>
>>> I just realized that it's invalid to filter null values in left outer
>>> join like this:
>>>
>>> select * from L left outer join R on L.k1 = R.k1 AND L.k1 is not null
>>>
>>> You will still receive all records from L table even if L.k1 equals null.
>>> The valid way should be use a inner table to first filter out all null
>>> values and then
>>> do left outer join, like this:
>>>
>>> select * from
>>>     (select * from L where L.k1 is not null) filteredL
>>>     left outer join R
>>>     on filteredL.k1 = R.k1
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Jan 16, 2020 at 4:19 AM Eva Eva <et...@gmail.com>
>>> wrote:
>>>
>>>> You are correct, there is a difference in the query. Below are the
>>>> queries with WHERE clause:
>>>>
>>>> Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])
>>>>
>>>>
>>>> Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND
>>>> $f16)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, $f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num], leftInputSpec=[HasUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>>> row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])
>>>>
>>>>
>>>> Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
>>>> $f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
>>>> ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
>>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>> ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
>>>> NULL AS $f32])
>>>>
>>>>
>>>> Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
>>>> $f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>> ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
>>>> ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
>>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
>>>> coBuyerAgentKeyL IS NOT NULL AS $f40])
>>>>
>>>>
>>>> Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
>>>> $f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
>>>> ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
>>>> row_num], leftInputSpec=[HasUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>>> row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
>>>> proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
>>>> ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
>>>> ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
>>>> listOfficeKeyL IS NOT NULL AS $f48])
>>>>
>>>> 1. So the number of records received will remain same for both with and
>>>> without NOT NULL filter. For the scenario with NOT NULL filter, there is an
>>>> extra field indicating whether the JOIN key is NULL or not.
>>>> 2. If the above #1 point is correct, then a qq, would this extra field (indicating
>>>> whether the JOIN key is NULL or not) help in avoiding unnecessary
>>>> processing for the cases when the JOIN key is null?
>>>> 3. If the above #2 point is correct, then I should have optimal
>>>> performance even though there is a data skew.
>>>>
>>>> Please let me know your thoughts?
>>>>
>>>>
>>>> Thanks,
>>>> Eva
>>>>
>>>> On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> > but I have noticed there is no change in the list of operations
>>>>> with or without NOT NULL filter for JOIN keys.
>>>>>
>>>>> I noticed that the "where" condition in these two plans are different.
>>>>> The without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
>>>>> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the
>>>>> display. Could you show the full name of operators for the version which
>>>>> with NOT NULL?
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Kurt,
>>>>>>
>>>>>> Below are the three scenarios that I tested:
>>>>>>
>>>>>> 1. Without "NOT NULL" filter for the join key.
>>>>>> Observation: data skew exists, as expected.
>>>>>> Sample query:
>>>>>> "SELECT * FROM latestListings l " +
>>>>>>
>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> 2. With "NOT NULL" filter for the join key.
>>>>>> Observation: data skew exists, same as above case 1
>>>>>> Sample query:
>>>>>>
>>>>>> "SELECT * FROM latestListings l " +
>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>
>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>
>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> 3. I replaced all NULL values for Join keys with randomly generated
>>>>>> values.
>>>>>> Observation: data skew gone, absence of NULL values resulted in good
>>>>>> data distribution
>>>>>> [image: image.png]
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>>
>>>>>> I have pasted pictures to show the number of records and the list of
>>>>>> operations.
>>>>>> Not sure if I understood your question correctly, but I have noticed
>>>>>> there is no change in the list of operations with or without NOT NULL
>>>>>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>>>>>> DeDuplication, without any other operation in between.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Eva
>>>>>>
>>>>>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>
>>>>>>> First could you check whether the added filter conditions are
>>>>>>> executed before join operators? If they are
>>>>>>> already pushed down and executed before join, it's should be some
>>>>>>> real join keys generating data skew.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <
>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kurt,
>>>>>>>>
>>>>>>>> Assuming I'm joining two tables, "latestListings" and
>>>>>>>> "latestAgents" like below:
>>>>>>>>
>>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>>>
>>>>>>>>
>>>>>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>>>>>> change the query as below:
>>>>>>>>
>>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>>>
>>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>>>
>>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>>>
>>>>>>>>
>>>>>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Eva
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> You can try to filter NULL values with an explicit condition like
>>>>>>>>> "xxxx is not NULL".
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Kurt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <
>>>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you both for the suggestions.
>>>>>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>>>>>> take it from there.
>>>>>>>>>>
>>>>>>>>>> *Problem that I identified:*
>>>>>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the
>>>>>>>>>> same operation I also noticed that the load is distributed poorly with
>>>>>>>>>> heavy load being fed to SubTask:21.
>>>>>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>>>>>> operation and being put into the same task.
>>>>>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>>>>>> checkpointing issue.
>>>>>>>>>>
>>>>>>>>>> Thanks for the guidance.
>>>>>>>>>> Eva.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <
>>>>>>>>>> qcx978132955@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi
>>>>>>>>>>>
>>>>>>>>>>> For expired checkpoint, you can find something like " Checkpoint
>>>>>>>>>>> xxx of job xx expired before completing" in jobmanager.log, then you can go
>>>>>>>>>>> to the checkpoint UI to find which tasks did not ack, and go to these tasks
>>>>>>>>>>> to see what happened.
>>>>>>>>>>>
>>>>>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>>>>>> checkpoint failed.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Congxian
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Eva
>>>>>>>>>>>>
>>>>>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log
>>>>>>>>>>>> to see why checkpoint failed, might be declined by some specific task.
>>>>>>>>>>>>
>>>>>>>>>>>> If checkpoint expired, you can also access the web UI to see
>>>>>>>>>>>> which tasks did not respond in time, some hot task might not be able to
>>>>>>>>>>>> respond in time. Generally speaking, checkpoint expired is mostly caused by
>>>>>>>>>>>> back pressure which led the checkpoint barrier did not arrive in time.
>>>>>>>>>>>> Resolve the back pressure could help the checkpoint finished before timeout.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and
>>>>>>>>>>>> back pressure [2] could help you.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>> Yun Tang
>>>>>>>>>>>> ------------------------------
>>>>>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>>>>>
>>>>>>>>>>>> My checkpoints are timing out intermittently, but as state
>>>>>>>>>>>> grows they are timing out more and more often eventually killing the job.
>>>>>>>>>>>>
>>>>>>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>>>>>>
>>>>>>>>>>>> Although size is huge, I have enough space on EC2 instance in
>>>>>>>>>>>> which I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>>>>>
>>>>>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>>>>>
>>>>>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Reva.
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Please suggest helpful tools

Posted by RKandoji <rk...@gmail.com>.
Hi Kurt,

I'm able to see significant improvement after fixing data skew (by
replacing NULLs with random generated value). It is processing lot more
records than earlier. Earlier checkpoint expiring issue caused job to hang
at ~2Million records, now after data skew fix its hanging after processing
~4Million records.

I see there is still HIGH backpressure for some operations, using UI I'm
able to see which operations and which subtasks are not able to reach
"100% acknowledged" status but I'm not sure what to investigate next to fix
it? Could you please suggest what else should I be checking or trying out.

[image: image.png]

Thanks,
Eva


On Thu, Jan 16, 2020 at 8:37 PM Eva Eva <et...@gmail.com>
wrote:

> ok, I'll give it a try and let you know how it goes. atm I'm using random
> generated values to replace null and its helping with the data skew.
>
> Thanks,
> Eva
>
>
> On Thu, Jan 16, 2020 at 2:02 AM Kurt Young <yk...@gmail.com> wrote:
>
>> I just realized that it's invalid to filter null values in left outer
>> join like this:
>>
>> select * from L left outer join R on L.k1 = R.k1 AND L.k1 is not null
>>
>> You will still receive all records from L table even if L.k1 equals null.
>> The valid way should be use a inner table to first filter out all null
>> values and then
>> do left outer join, like this:
>>
>> select * from
>>     (select * from L where L.k1 is not null) filteredL
>>     left outer join R
>>     on filteredL.k1 = R.k1
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Jan 16, 2020 at 4:19 AM Eva Eva <et...@gmail.com>
>> wrote:
>>
>>> You are correct, there is a difference in the query. Below are the
>>> queries with WHERE clause:
>>>
>>> Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])
>>>
>>>
>>> Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND
>>> $f16)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, $f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num], leftInputSpec=[HasUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>> row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])
>>>
>>>
>>> Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
>>> $f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
>>> ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>> ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
>>> NULL AS $f32])
>>>
>>>
>>> Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
>>> $f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>> ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
>>> ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
>>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
>>> coBuyerAgentKeyL IS NOT NULL AS $f40])
>>>
>>>
>>> Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
>>> $f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
>>> ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
>>> row_num], leftInputSpec=[HasUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>>> row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
>>> proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
>>> ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
>>> ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
>>> listOfficeKeyL IS NOT NULL AS $f48])
>>>
>>> 1. So the number of records received will remain same for both with and
>>> without NOT NULL filter. For the scenario with NOT NULL filter, there is an
>>> extra field indicating whether the JOIN key is NULL or not.
>>> 2. If the above #1 point is correct, then a qq, would this extra field (indicating
>>> whether the JOIN key is NULL or not) help in avoiding unnecessary
>>> processing for the cases when the JOIN key is null?
>>> 3. If the above #2 point is correct, then I should have optimal
>>> performance even though there is a data skew.
>>>
>>> Please let me know your thoughts?
>>>
>>>
>>> Thanks,
>>> Eva
>>>
>>> On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> > but I have noticed there is no change in the list of operations with
>>>> or without NOT NULL filter for JOIN keys.
>>>>
>>>> I noticed that the "where" condition in these two plans are different.
>>>> The without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
>>>> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the display.
>>>> Could you show the full name of operators for the version which with NOT
>>>> NULL?
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kurt,
>>>>>
>>>>> Below are the three scenarios that I tested:
>>>>>
>>>>> 1. Without "NOT NULL" filter for the join key.
>>>>> Observation: data skew exists, as expected.
>>>>> Sample query:
>>>>> "SELECT * FROM latestListings l " +
>>>>>
>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> 2. With "NOT NULL" filter for the join key.
>>>>> Observation: data skew exists, same as above case 1
>>>>> Sample query:
>>>>>
>>>>> "SELECT * FROM latestListings l " +
>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>
>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>
>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> 3. I replaced all NULL values for Join keys with randomly generated
>>>>> values.
>>>>> Observation: data skew gone, absence of NULL values resulted in good
>>>>> data distribution
>>>>> [image: image.png]
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>>
>>>>> I have pasted pictures to show the number of records and the list of
>>>>> operations.
>>>>> Not sure if I understood your question correctly, but I have noticed
>>>>> there is no change in the list of operations with or without NOT NULL
>>>>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>>>>> DeDuplication, without any other operation in between.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Eva
>>>>>
>>>>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>
>>>>>> First could you check whether the added filter conditions are
>>>>>> executed before join operators? If they are
>>>>>> already pushed down and executed before join, it's should be some
>>>>>> real join keys generating data skew.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <
>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kurt,
>>>>>>>
>>>>>>> Assuming I'm joining two tables, "latestListings" and "latestAgents"
>>>>>>> like below:
>>>>>>>
>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>>
>>>>>>>
>>>>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>>>>> change the query as below:
>>>>>>>
>>>>>>> "SELECT * FROM latestListings l " +
>>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>>
>>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>>
>>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>>
>>>>>>>
>>>>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Eva
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> You can try to filter NULL values with an explicit condition like
>>>>>>>> "xxxx is not NULL".
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Kurt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <
>>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you both for the suggestions.
>>>>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>>>>> take it from there.
>>>>>>>>>
>>>>>>>>> *Problem that I identified:*
>>>>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the
>>>>>>>>> same operation I also noticed that the load is distributed poorly with
>>>>>>>>> heavy load being fed to SubTask:21.
>>>>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>>>>> operation and being put into the same task.
>>>>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>>>>> checkpointing issue.
>>>>>>>>>
>>>>>>>>> Thanks for the guidance.
>>>>>>>>> Eva.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <
>>>>>>>>> qcx978132955@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> For expired checkpoint, you can find something like " Checkpoint
>>>>>>>>>> xxx of job xx expired before completing" in jobmanager.log, then you can go
>>>>>>>>>> to the checkpoint UI to find which tasks did not ack, and go to these tasks
>>>>>>>>>> to see what happened.
>>>>>>>>>>
>>>>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>>>>> checkpoint failed.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Congxian
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Eva
>>>>>>>>>>>
>>>>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log
>>>>>>>>>>> to see why checkpoint failed, might be declined by some specific task.
>>>>>>>>>>>
>>>>>>>>>>> If checkpoint expired, you can also access the web UI to see
>>>>>>>>>>> which tasks did not respond in time, some hot task might not be able to
>>>>>>>>>>> respond in time. Generally speaking, checkpoint expired is mostly caused by
>>>>>>>>>>> back pressure which led the checkpoint barrier did not arrive in time.
>>>>>>>>>>> Resolve the back pressure could help the checkpoint finished before timeout.
>>>>>>>>>>>
>>>>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>>>>>>> pressure [2] could help you.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Yun Tang
>>>>>>>>>>> ------------------------------
>>>>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>>>>
>>>>>>>>>>> My checkpoints are timing out intermittently, but as state grows
>>>>>>>>>>> they are timing out more and more often eventually killing the job.
>>>>>>>>>>>
>>>>>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>>>>>
>>>>>>>>>>> Although size is huge, I have enough space on EC2 instance in
>>>>>>>>>>> which I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>>>>
>>>>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>>>>
>>>>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Reva.
>>>>>>>>>>>
>>>>>>>>>>

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
ok, I'll give it a try and let you know how it goes. atm I'm using random
generated values to replace null and its helping with the data skew.

Thanks,
Eva


On Thu, Jan 16, 2020 at 2:02 AM Kurt Young <yk...@gmail.com> wrote:

> I just realized that it's invalid to filter null values in left outer join
> like this:
>
> select * from L left outer join R on L.k1 = R.k1 AND L.k1 is not null
>
> You will still receive all records from L table even if L.k1 equals null.
> The valid way should be use a inner table to first filter out all null
> values and then
> do left outer join, like this:
>
> select * from
>     (select * from L where L.k1 is not null) filteredL
>     left outer join R
>     on filteredL.k1 = R.k1
>
> Best,
> Kurt
>
>
> On Thu, Jan 16, 2020 at 4:19 AM Eva Eva <et...@gmail.com>
> wrote:
>
>> You are correct, there is a difference in the query. Below are the
>> queries with WHERE clause:
>>
>> Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])
>>
>>
>> Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND
>> $f16)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, $f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num], leftInputSpec=[HasUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>> row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])
>>
>>
>> Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
>> $f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
>> ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>> ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
>> NULL AS $f32])
>>
>>
>> Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
>> $f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>> ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
>> ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
>> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
>> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
>> coBuyerAgentKeyL IS NOT NULL AS $f40])
>>
>>
>> Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
>> $f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
>> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
>> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
>> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
>> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
>> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
>> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
>> ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
>> row_num], leftInputSpec=[HasUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
>> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
>> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
>> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
>> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
>> row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
>> proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
>> ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
>> ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
>> listOfficeKeyL IS NOT NULL AS $f48])
>>
>> 1. So the number of records received will remain same for both with and
>> without NOT NULL filter. For the scenario with NOT NULL filter, there is an
>> extra field indicating whether the JOIN key is NULL or not.
>> 2. If the above #1 point is correct, then a qq, would this extra field (indicating
>> whether the JOIN key is NULL or not) help in avoiding unnecessary
>> processing for the cases when the JOIN key is null?
>> 3. If the above #2 point is correct, then I should have optimal
>> performance even though there is a data skew.
>>
>> Please let me know your thoughts?
>>
>>
>> Thanks,
>> Eva
>>
>> On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:
>>
>>> > but I have noticed there is no change in the list of operations with
>>> or without NOT NULL filter for JOIN keys.
>>>
>>> I noticed that the "where" condition in these two plans are different.
>>> The without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
>>> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the display.
>>> Could you show the full name of operators for the version which with NOT
>>> NULL?
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kurt,
>>>>
>>>> Below are the three scenarios that I tested:
>>>>
>>>> 1. Without "NOT NULL" filter for the join key.
>>>> Observation: data skew exists, as expected.
>>>> Sample query:
>>>> "SELECT * FROM latestListings l " +
>>>>
>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>>
>>>> 2. With "NOT NULL" filter for the join key.
>>>> Observation: data skew exists, same as above case 1
>>>> Sample query:
>>>>
>>>> "SELECT * FROM latestListings l " +
>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>
>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>
>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>>
>>>> 3. I replaced all NULL values for Join keys with randomly generated
>>>> values.
>>>> Observation: data skew gone, absence of NULL values resulted in good
>>>> data distribution
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> I have pasted pictures to show the number of records and the list of
>>>> operations.
>>>> Not sure if I understood your question correctly, but I have noticed
>>>> there is no change in the list of operations with or without NOT NULL
>>>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>>>> DeDuplication, without any other operation in between.
>>>>
>>>>
>>>> Thanks,
>>>> Eva
>>>>
>>>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> First could you check whether the added filter conditions are executed
>>>>> before join operators? If they are
>>>>> already pushed down and executed before join, it's should be some real
>>>>> join keys generating data skew.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Kurt,
>>>>>>
>>>>>> Assuming I'm joining two tables, "latestListings" and "latestAgents"
>>>>>> like below:
>>>>>>
>>>>>> "SELECT * FROM latestListings l " +
>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>>
>>>>>>
>>>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>>>> change the query as below:
>>>>>>
>>>>>> "SELECT * FROM latestListings l " +
>>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>>
>>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>>
>>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>>
>>>>>>
>>>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Eva
>>>>>>
>>>>>>
>>>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> You can try to filter NULL values with an explicit condition like
>>>>>>> "xxxx is not NULL".
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <
>>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you both for the suggestions.
>>>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>>>> take it from there.
>>>>>>>>
>>>>>>>> *Problem that I identified:*
>>>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the
>>>>>>>> same operation I also noticed that the load is distributed poorly with
>>>>>>>> heavy load being fed to SubTask:21.
>>>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>>>> operation and being put into the same task.
>>>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>>>> checkpointing issue.
>>>>>>>>
>>>>>>>> Thanks for the guidance.
>>>>>>>> Eva.
>>>>>>>>
>>>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <
>>>>>>>> qcx978132955@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> For expired checkpoint, you can find something like " Checkpoint
>>>>>>>>> xxx of job xx expired before completing" in jobmanager.log, then you can go
>>>>>>>>> to the checkpoint UI to find which tasks did not ack, and go to these tasks
>>>>>>>>> to see what happened.
>>>>>>>>>
>>>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>>>> checkpoint failed.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Congxian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>>>
>>>>>>>>>> Hi Eva
>>>>>>>>>>
>>>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log to
>>>>>>>>>> see why checkpoint failed, might be declined by some specific task.
>>>>>>>>>>
>>>>>>>>>> If checkpoint expired, you can also access the web UI to see
>>>>>>>>>> which tasks did not respond in time, some hot task might not be able to
>>>>>>>>>> respond in time. Generally speaking, checkpoint expired is mostly caused by
>>>>>>>>>> back pressure which led the checkpoint barrier did not arrive in time.
>>>>>>>>>> Resolve the back pressure could help the checkpoint finished before timeout.
>>>>>>>>>>
>>>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>>>>>> pressure [2] could help you.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Yun Tang
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>>>
>>>>>>>>>> My checkpoints are timing out intermittently, but as state grows
>>>>>>>>>> they are timing out more and more often eventually killing the job.
>>>>>>>>>>
>>>>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>>>>
>>>>>>>>>> Although size is huge, I have enough space on EC2 instance in
>>>>>>>>>> which I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>>>
>>>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>>>
>>>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Reva.
>>>>>>>>>>
>>>>>>>>>

Re: Please suggest helpful tools

Posted by Kurt Young <yk...@gmail.com>.
I just realized that it's invalid to filter null values in left outer join
like this:

select * from L left outer join R on L.k1 = R.k1 AND L.k1 is not null

You will still receive all records from L table even if L.k1 equals null.
The valid way should be use a inner table to first filter out all null
values and then
do left outer join, like this:

select * from
    (select * from L where L.k1 is not null) filteredL
    left outer join R
    on filteredL.k1 = R.k1

Best,
Kurt


On Thu, Jan 16, 2020 at 4:19 AM Eva Eva <et...@gmail.com>
wrote:

> You are correct, there is a difference in the query. Below are the
> queries with WHERE clause:
>
> Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])
>
>
> Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND $f16)],
> select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
> $f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA,
> proctime0, row_num], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
> row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])
>
>
> Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
> $f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
> dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
> ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
> ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
> NULL AS $f32])
>
>
> Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
> $f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
> ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
> ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
> Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
> coBuyerAgentKeyL IS NOT NULL AS $f40])
>
>
> Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
> $f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
> listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
> listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
> dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
> dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
> ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
> ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
> ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
> row_num], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
> ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
> buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
> buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
> ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
> row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
> proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
> ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
> ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
> listOfficeKeyL IS NOT NULL AS $f48])
>
> 1. So the number of records received will remain same for both with and
> without NOT NULL filter. For the scenario with NOT NULL filter, there is an
> extra field indicating whether the JOIN key is NULL or not.
> 2. If the above #1 point is correct, then a qq, would this extra field (indicating
> whether the JOIN key is NULL or not) help in avoiding unnecessary
> processing for the cases when the JOIN key is null?
> 3. If the above #2 point is correct, then I should have optimal
> performance even though there is a data skew.
>
> Please let me know your thoughts?
>
>
> Thanks,
> Eva
>
> On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:
>
>> > but I have noticed there is no change in the list of operations with
>> or without NOT NULL filter for JOIN keys.
>>
>> I noticed that the "where" condition in these two plans are different.
>> The without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
>> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the display.
>> Could you show the full name of operators for the version which with NOT
>> NULL?
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
>> wrote:
>>
>>> Hi Kurt,
>>>
>>> Below are the three scenarios that I tested:
>>>
>>> 1. Without "NOT NULL" filter for the join key.
>>> Observation: data skew exists, as expected.
>>> Sample query:
>>> "SELECT * FROM latestListings l " +
>>>
>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>
>>> [image: image.png]
>>>
>>> [image: image.png]
>>>
>>> 2. With "NOT NULL" filter for the join key.
>>> Observation: data skew exists, same as above case 1
>>> Sample query:
>>>
>>> "SELECT * FROM latestListings l " +
>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>
>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>
>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>
>>> [image: image.png]
>>>
>>> [image: image.png]
>>>
>>> 3. I replaced all NULL values for Join keys with randomly generated
>>> values.
>>> Observation: data skew gone, absence of NULL values resulted in good
>>> data distribution
>>> [image: image.png]
>>>
>>> [image: image.png]
>>>
>>>
>>> I have pasted pictures to show the number of records and the list of
>>> operations.
>>> Not sure if I understood your question correctly, but I have noticed
>>> there is no change in the list of operations with or without NOT NULL
>>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>>> DeDuplication, without any other operation in between.
>>>
>>>
>>> Thanks,
>>> Eva
>>>
>>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> First could you check whether the added filter conditions are executed
>>>> before join operators? If they are
>>>> already pushed down and executed before join, it's should be some real
>>>> join keys generating data skew.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kurt,
>>>>>
>>>>> Assuming I'm joining two tables, "latestListings" and "latestAgents"
>>>>> like below:
>>>>>
>>>>> "SELECT * FROM latestListings l " +
>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>>
>>>>>
>>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>>> change the query as below:
>>>>>
>>>>> "SELECT * FROM latestListings l " +
>>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>>
>>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>>
>>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>>
>>>>>
>>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Eva
>>>>>
>>>>>
>>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You can try to filter NULL values with an explicit condition like
>>>>>> "xxxx is not NULL".
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <
>>>>>> eternalsunshine2709@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you both for the suggestions.
>>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>>> take it from there.
>>>>>>>
>>>>>>> *Problem that I identified:*
>>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the
>>>>>>> same operation I also noticed that the load is distributed poorly with
>>>>>>> heavy load being fed to SubTask:21.
>>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>>> operation and being put into the same task.
>>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>>> checkpointing issue.
>>>>>>>
>>>>>>> Thanks for the guidance.
>>>>>>> Eva.
>>>>>>>
>>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> For expired checkpoint, you can find something like " Checkpoint
>>>>>>>> xxx of job xx expired before completing" in jobmanager.log, then you can go
>>>>>>>> to the checkpoint UI to find which tasks did not ack, and go to these tasks
>>>>>>>> to see what happened.
>>>>>>>>
>>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>>> checkpoint failed.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Congxian
>>>>>>>>
>>>>>>>>
>>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>>
>>>>>>>>> Hi Eva
>>>>>>>>>
>>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log to
>>>>>>>>> see why checkpoint failed, might be declined by some specific task.
>>>>>>>>>
>>>>>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>>>>>
>>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>>>>> pressure [2] could help you.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Yun Tang
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>>
>>>>>>>>> My checkpoints are timing out intermittently, but as state grows
>>>>>>>>> they are timing out more and more often eventually killing the job.
>>>>>>>>>
>>>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>>>
>>>>>>>>> Although size is huge, I have enough space on EC2 instance in
>>>>>>>>> which I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>>
>>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>>
>>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Reva.
>>>>>>>>>
>>>>>>>>

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
You are correct, there is a difference in the query. Below are the queries
with WHERE clause:

Deduplicate(keep=[LastRow], key=[ucPKL], order=[PROCTIME]) ->
Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, listAgentKeyL IS NOT NULL AS $f16])


Join(joinType=[LeftOuterJoin], where=[((listAgentKeyL = ucPKA) AND $f16)],
select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
$f16, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA,
proctime0, row_num], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
row_num AS row_num0, buyerAgentKeyL IS NOT NULL AS $f24])


Join(joinType=[LeftOuterJoin], where=[((buyerAgentKeyL = ucPKA0) AND
$f24)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
dataA, proctime0, row_num0, $f24, ucPKA0, ucUpdateTSA0, ucVersionA0,
ucRowTypeA0, ucTypeA0, dataA0, proctime1, row_num],
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
ucTypeA0, dataA0, proctime1, row_num AS row_num1, coListAgentKeyL IS NOT
NULL AS $f32])


Join(joinType=[LeftOuterJoin], where=[((coListAgentKeyL = ucPKA1) AND
$f32)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
ucTypeA0, dataA0, proctime1, row_num1, $f32, ucPKA1, ucUpdateTSA1,
ucVersionA1, ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num],
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ->
Calc(select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num AS row_num2,
coBuyerAgentKeyL IS NOT NULL AS $f40])


Join(joinType=[LeftOuterJoin], where=[((coBuyerAgentKeyL = ucPKA2) AND
$f40)], select=[ucPKL, ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL,
listAgentKeyL, buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL,
listOfficeKeyL, buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL,
dataL, proctime, ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA,
dataA, proctime0, row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0,
ucTypeA0, dataA0, proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1,
ucRowTypeA1, ucTypeA1, dataA1, proctime2, row_num2, $f40, ucPKA2,
ucUpdateTSA2, ucVersionA2, ucRowTypeA2, ucTypeA2, dataA2, proctime3,
row_num], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ucPKL,
ucUpdateTSL, ucVersionL, ucRowTypeL, ucTypeL, listAgentKeyL,
buyerAgentKeyL, coListAgentKeyL, coBuyerAgentKeyL, listOfficeKeyL,
buyerOfficeKeyL, coListOfficeKeyL, coBuyerOfficeKeyL, dataL, proctime,
ucPKA, ucUpdateTSA, ucVersionA, ucRowTypeA, ucTypeA, dataA, proctime0,
row_num0, ucPKA0, ucUpdateTSA0, ucVersionA0, ucRowTypeA0, ucTypeA0, dataA0,
proctime1, row_num1, ucPKA1, ucUpdateTSA1, ucVersionA1, ucRowTypeA1,
ucTypeA1, dataA1, proctime2, row_num2, ucPKA2, ucUpdateTSA2, ucVersionA2,
ucRowTypeA2, ucTypeA2, dataA2, proctime3, row_num AS row_num3,
listOfficeKeyL IS NOT NULL AS $f48])

1. So the number of records received will remain same for both with and
without NOT NULL filter. For the scenario with NOT NULL filter, there is an
extra field indicating whether the JOIN key is NULL or not.
2. If the above #1 point is correct, then a qq, would this extra field
(indicating
whether the JOIN key is NULL or not) help in avoiding unnecessary
processing for the cases when the JOIN key is null?
3. If the above #2 point is correct, then I should have optimal performance
even though there is a data skew.

Please let me know your thoughts?


Thanks,
Eva

On Wed, Jan 15, 2020 at 3:03 AM Kurt Young <yk...@gmail.com> wrote:

> > but I have noticed there is no change in the list of operations with or
> without NOT NULL filter for JOIN keys.
>
> I noticed that the "where" condition in these two plans are different. The
> without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
> has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the display.
> Could you show the full name of operators for the version which with NOT
> NULL?
>
> Best,
> Kurt
>
>
> On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
> wrote:
>
>> Hi Kurt,
>>
>> Below are the three scenarios that I tested:
>>
>> 1. Without "NOT NULL" filter for the join key.
>> Observation: data skew exists, as expected.
>> Sample query:
>> "SELECT * FROM latestListings l " +
>>
>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>> 2. With "NOT NULL" filter for the join key.
>> Observation: data skew exists, same as above case 1
>> Sample query:
>>
>> "SELECT * FROM latestListings l " +
>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>
>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>
>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>> 3. I replaced all NULL values for Join keys with randomly generated
>> values.
>> Observation: data skew gone, absence of NULL values resulted in good
>> data distribution
>> [image: image.png]
>>
>> [image: image.png]
>>
>>
>> I have pasted pictures to show the number of records and the list of
>> operations.
>> Not sure if I understood your question correctly, but I have noticed
>> there is no change in the list of operations with or without NOT NULL
>> filter for JOIN keys. From the task flow diagram, JOIN is kicked after
>> DeDuplication, without any other operation in between.
>>
>>
>> Thanks,
>> Eva
>>
>> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> First could you check whether the added filter conditions are executed
>>> before join operators? If they are
>>> already pushed down and executed before join, it's should be some real
>>> join keys generating data skew.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kurt,
>>>>
>>>> Assuming I'm joining two tables, "latestListings" and "latestAgents"
>>>> like below:
>>>>
>>>> "SELECT * FROM latestListings l " +
>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>>
>>>>
>>>> In order to avoid joining on NULL keys, are you suggesting that I
>>>> change the query as below:
>>>>
>>>> "SELECT * FROM latestListings l " +
>>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>>
>>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>>
>>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>>
>>>>
>>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Eva
>>>>
>>>>
>>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> You can try to filter NULL values with an explicit condition like
>>>>> "xxxx is not NULL".
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thank you both for the suggestions.
>>>>>> I did a bit more analysis using UI and identified at least one
>>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>>> take it from there.
>>>>>>
>>>>>> *Problem that I identified:*
>>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>>> progress with corresponding SubTask:21 has "n/a" value. For the same
>>>>>> operation I also noticed that the load is distributed poorly with heavy
>>>>>> load being fed to SubTask:21.
>>>>>> My guess is bunch of null values are happening for this JOIN
>>>>>> operation and being put into the same task.
>>>>>> Currently I'm using SQL query which gives me limited control on
>>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>>> checkpointing issue.
>>>>>>
>>>>>> Thanks for the guidance.
>>>>>> Eva.
>>>>>>
>>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> For expired checkpoint, you can find something like " Checkpoint xxx
>>>>>>> of job xx expired before completing" in jobmanager.log, then you can go to
>>>>>>> the checkpoint UI to find which tasks did not ack, and go to these tasks to
>>>>>>> see what happened.
>>>>>>>
>>>>>>> If checkpoint was been declined, you can find something like
>>>>>>> "Decline checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log,
>>>>>>> in this case, you can go to the task directly to find out why the
>>>>>>> checkpoint failed.
>>>>>>>
>>>>>>> Best,
>>>>>>> Congxian
>>>>>>>
>>>>>>>
>>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>>
>>>>>>>> Hi Eva
>>>>>>>>
>>>>>>>> If checkpoint failed, please view the web UI or jobmanager log to
>>>>>>>> see why checkpoint failed, might be declined by some specific task.
>>>>>>>>
>>>>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>>>>
>>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>>>> pressure [2] could help you.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Yun Tang
>>>>>>>> ------------------------------
>>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>>
>>>>>>>> My checkpoints are timing out intermittently, but as state grows
>>>>>>>> they are timing out more and more often eventually killing the job.
>>>>>>>>
>>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>>
>>>>>>>> Although size is huge, I have enough space on EC2 instance in which
>>>>>>>> I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>>
>>>>>>>> *Logs does not have any useful information to understand why
>>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>>
>>>>>>>> Also any other related suggestions are welcome.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Reva.
>>>>>>>>
>>>>>>>

Re: Please suggest helpful tools

Posted by Kurt Young <yk...@gmail.com>.
> but I have noticed there is no change in the list of operations with or
without NOT NULL filter for JOIN keys.

I noticed that the "where" condition in these two plans are different. The
without NOT NULL only has "listAgentKeyL = ucPKA" but the with NOT NULL
has "((listAgentKeyL = ucPKA) AND (...))" but truncated by the display.
Could you show the full name of operators for the version which with NOT
NULL?

Best,
Kurt


On Wed, Jan 15, 2020 at 2:45 AM Eva Eva <et...@gmail.com>
wrote:

> Hi Kurt,
>
> Below are the three scenarios that I tested:
>
> 1. Without "NOT NULL" filter for the join key.
> Observation: data skew exists, as expected.
> Sample query:
> "SELECT * FROM latestListings l " +
>
>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>
> [image: image.png]
>
> [image: image.png]
>
> 2. With "NOT NULL" filter for the join key.
> Observation: data skew exists, same as above case 1
> Sample query:
>
> "SELECT * FROM latestListings l " +
>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>
>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>
>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>
> [image: image.png]
>
> [image: image.png]
>
> 3. I replaced all NULL values for Join keys with randomly generated values.
> Observation: data skew gone, absence of NULL values resulted in good data
> distribution
> [image: image.png]
>
> [image: image.png]
>
>
> I have pasted pictures to show the number of records and the list of
> operations.
> Not sure if I understood your question correctly, but I have noticed there
> is no change in the list of operations with or without NOT NULL filter for
> JOIN keys. From the task flow diagram, JOIN is kicked after DeDuplication,
> without any other operation in between.
>
>
> Thanks,
> Eva
>
> On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:
>
>> First could you check whether the added filter conditions are executed
>> before join operators? If they are
>> already pushed down and executed before join, it's should be some real
>> join keys generating data skew.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
>> wrote:
>>
>>> Hi Kurt,
>>>
>>> Assuming I'm joining two tables, "latestListings" and "latestAgents"
>>> like below:
>>>
>>> "SELECT * FROM latestListings l " +
>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>>
>>>
>>> In order to avoid joining on NULL keys, are you suggesting that I change
>>> the query as below:
>>>
>>> "SELECT * FROM latestListings l " +
>>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>>
>>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>>
>>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>>
>>>
>>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>>
>>>
>>> Thanks,
>>>
>>> Eva
>>>
>>>
>>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> You can try to filter NULL values with an explicit condition like "xxxx
>>>> is not NULL".
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you both for the suggestions.
>>>>> I did a bit more analysis using UI and identified at least one
>>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>>> take it from there.
>>>>>
>>>>> *Problem that I identified:*
>>>>> I'm running with 26 parallelism. For the checkpoints that are
>>>>> expiring, one of a JOIN operation is finishing at 25/26 (96%)
>>>>> progress with corresponding SubTask:21 has "n/a" value. For the same
>>>>> operation I also noticed that the load is distributed poorly with heavy
>>>>> load being fed to SubTask:21.
>>>>> My guess is bunch of null values are happening for this JOIN operation
>>>>> and being put into the same task.
>>>>> Currently I'm using SQL query which gives me limited control on
>>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>>> with better load distribution across subtasks. And may also fix expiring
>>>>> checkpointing issue.
>>>>>
>>>>> Thanks for the guidance.
>>>>> Eva.
>>>>>
>>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> For expired checkpoint, you can find something like " Checkpoint xxx
>>>>>> of job xx expired before completing" in jobmanager.log, then you can go to
>>>>>> the checkpoint UI to find which tasks did not ack, and go to these tasks to
>>>>>> see what happened.
>>>>>>
>>>>>> If checkpoint was been declined, you can find something like "Decline
>>>>>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>>>>>> case, you can go to the task directly to find out why the checkpoint failed.
>>>>>>
>>>>>> Best,
>>>>>> Congxian
>>>>>>
>>>>>>
>>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>>
>>>>>>> Hi Eva
>>>>>>>
>>>>>>> If checkpoint failed, please view the web UI or jobmanager log to
>>>>>>> see why checkpoint failed, might be declined by some specific task.
>>>>>>>
>>>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>>>
>>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>>> pressure [2] could help you.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>>
>>>>>>> Best
>>>>>>> Yun Tang
>>>>>>> ------------------------------
>>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>> *Subject:* Please suggest helpful tools
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>>
>>>>>>> My checkpoints are timing out intermittently, but as state grows
>>>>>>> they are timing out more and more often eventually killing the job.
>>>>>>>
>>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB
>>>>>>> (this one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>>
>>>>>>> Although size is huge, I have enough space on EC2 instance in which
>>>>>>> I'm running job. I'm using RocksDB for checkpointing.
>>>>>>>
>>>>>>> *Logs does not have any useful information to understand why
>>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>>
>>>>>>> Also any other related suggestions are welcome.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Reva.
>>>>>>>
>>>>>>

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
Hi Kurt,

Below are the three scenarios that I tested:

1. Without "NOT NULL" filter for the join key.
Observation: data skew exists, as expected.
Sample query:
"SELECT * FROM latestListings l " +

        "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
        "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
        "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +

[image: image.png]

[image: image.png]

2. With "NOT NULL" filter for the join key.
Observation: data skew exists, same as above case 1
Sample query:

"SELECT * FROM latestListings l " +
        "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND
l.listAgentKeyL IS NOT NULL " +

        "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND
l.buyerAgentKeyL IS NOT NULL " +

        "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND
l.coListAgentKeyL IS NOT NULL" +

[image: image.png]

[image: image.png]

3. I replaced all NULL values for Join keys with randomly generated values.
Observation: data skew gone, absence of NULL values resulted in good data
distribution
[image: image.png]

[image: image.png]


I have pasted pictures to show the number of records and the list of
operations.
Not sure if I understood your question correctly, but I have noticed there
is no change in the list of operations with or without NOT NULL filter for
JOIN keys. From the task flow diagram, JOIN is kicked after DeDuplication,
without any other operation in between.


Thanks,
Eva

On Mon, Jan 13, 2020 at 8:41 PM Kurt Young <yk...@gmail.com> wrote:

> First could you check whether the added filter conditions are executed
> before join operators? If they are
> already pushed down and executed before join, it's should be some real
> join keys generating data skew.
>
> Best,
> Kurt
>
>
> On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
> wrote:
>
>> Hi Kurt,
>>
>> Assuming I'm joining two tables, "latestListings" and "latestAgents" like
>> below:
>>
>> "SELECT * FROM latestListings l " +
>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>>
>>
>> In order to avoid joining on NULL keys, are you suggesting that I change
>> the query as below:
>>
>> "SELECT * FROM latestListings l " +
>>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>>
>>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>>
>>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>>
>>
>> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>>
>>
>> Thanks,
>>
>> Eva
>>
>>
>> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> You can try to filter NULL values with an explicit condition like "xxxx
>>> is not NULL".
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
>>> wrote:
>>>
>>>> Thank you both for the suggestions.
>>>> I did a bit more analysis using UI and identified at least one
>>>> problem that's occurring with the job rn. Going to fix it first and then
>>>> take it from there.
>>>>
>>>> *Problem that I identified:*
>>>> I'm running with 26 parallelism. For the checkpoints that are expiring,
>>>> one of a JOIN operation is finishing at 25/26 (96%) progress with
>>>> corresponding SubTask:21 has "n/a" value. For the same operation I
>>>> also noticed that the load is distributed poorly with heavy load being fed
>>>> to SubTask:21.
>>>> My guess is bunch of null values are happening for this JOIN operation
>>>> and being put into the same task.
>>>> Currently I'm using SQL query which gives me limited control on
>>>> handling null values so I'll try to programmatically JOIN and see if I can
>>>> avoid JOIN operation whenever the joining value is null. This should help
>>>> with better load distribution across subtasks. And may also fix expiring
>>>> checkpointing issue.
>>>>
>>>> Thanks for the guidance.
>>>> Eva.
>>>>
>>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> For expired checkpoint, you can find something like " Checkpoint xxx
>>>>> of job xx expired before completing" in jobmanager.log, then you can go to
>>>>> the checkpoint UI to find which tasks did not ack, and go to these tasks to
>>>>> see what happened.
>>>>>
>>>>> If checkpoint was been declined, you can find something like "Decline
>>>>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>>>>> case, you can go to the task directly to find out why the checkpoint failed.
>>>>>
>>>>> Best,
>>>>> Congxian
>>>>>
>>>>>
>>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>>
>>>>>> Hi Eva
>>>>>>
>>>>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>>>>> why checkpoint failed, might be declined by some specific task.
>>>>>>
>>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>>
>>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>>> pressure [2] could help you.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>>
>>>>>> Best
>>>>>> Yun Tang
>>>>>> ------------------------------
>>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>>> *To:* user <us...@flink.apache.org>
>>>>>> *Subject:* Please suggest helpful tools
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>>
>>>>>> My checkpoints are timing out intermittently, but as state grows they
>>>>>> are timing out more and more often eventually killing the job.
>>>>>>
>>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>>>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>>
>>>>>> Although size is huge, I have enough space on EC2 instance in which
>>>>>> I'm running job. I'm using RocksDB for checkpointing.
>>>>>>
>>>>>> *Logs does not have any useful information to understand why
>>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>>
>>>>>> Also any other related suggestions are welcome.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Reva.
>>>>>>
>>>>>

Re: Please suggest helpful tools

Posted by Kurt Young <yk...@gmail.com>.
First could you check whether the added filter conditions are executed
before join operators? If they are
already pushed down and executed before join, it's should be some real join
keys generating data skew.

Best,
Kurt


On Tue, Jan 14, 2020 at 5:09 AM Eva Eva <et...@gmail.com>
wrote:

> Hi Kurt,
>
> Assuming I'm joining two tables, "latestListings" and "latestAgents" like
> below:
>
> "SELECT * FROM latestListings l " +
>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>
>
> In order to avoid joining on NULL keys, are you suggesting that I change
> the query as below:
>
> "SELECT * FROM latestListings l " +
>         "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND l.listAgentKeyL IS NOT NULL " +
>
>         "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND l.buyerAgentKeyL IS NOT NULL " +
>
>         "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND l.coListAgentKeyL IS NOT NULL" +
>
>
> I tried this but noticed that it didn't work as the data skew (and heavy load on one task) continued. Could you please let me know if I missed anything?
>
>
> Thanks,
>
> Eva
>
>
> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:
>
>> Hi,
>>
>> You can try to filter NULL values with an explicit condition like "xxxx
>> is not NULL".
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
>> wrote:
>>
>>> Thank you both for the suggestions.
>>> I did a bit more analysis using UI and identified at least one
>>> problem that's occurring with the job rn. Going to fix it first and then
>>> take it from there.
>>>
>>> *Problem that I identified:*
>>> I'm running with 26 parallelism. For the checkpoints that are expiring,
>>> one of a JOIN operation is finishing at 25/26 (96%) progress with
>>> corresponding SubTask:21 has "n/a" value. For the same operation I also
>>> noticed that the load is distributed poorly with heavy load being fed to
>>> SubTask:21.
>>> My guess is bunch of null values are happening for this JOIN operation
>>> and being put into the same task.
>>> Currently I'm using SQL query which gives me limited control on handling
>>> null values so I'll try to programmatically JOIN and see if I can avoid
>>> JOIN operation whenever the joining value is null. This should help with
>>> better load distribution across subtasks. And may also fix expiring
>>> checkpointing issue.
>>>
>>> Thanks for the guidance.
>>> Eva.
>>>
>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> For expired checkpoint, you can find something like " Checkpoint xxx of
>>>> job xx expired before completing" in jobmanager.log, then you can go to the
>>>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>>>> what happened.
>>>>
>>>> If checkpoint was been declined, you can find something like "Decline
>>>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>>>> case, you can go to the task directly to find out why the checkpoint failed.
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>>
>>>>> Hi Eva
>>>>>
>>>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>>>> why checkpoint failed, might be declined by some specific task.
>>>>>
>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>
>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>> pressure [2] could help you.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>>
>>>>> Best
>>>>> Yun Tang
>>>>> ------------------------------
>>>>> *From:* Eva Eva <et...@gmail.com>
>>>>> *Sent:* Friday, January 10, 2020 10:29
>>>>> *To:* user <us...@flink.apache.org>
>>>>> *Subject:* Please suggest helpful tools
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>>
>>>>> My checkpoints are timing out intermittently, but as state grows they
>>>>> are timing out more and more often eventually killing the job.
>>>>>
>>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>>>
>>>>> Although size is huge, I have enough space on EC2 instance in which
>>>>> I'm running job. I'm using RocksDB for checkpointing.
>>>>>
>>>>> *Logs does not have any useful information to understand why
>>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>>
>>>>> Also any other related suggestions are welcome.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Reva.
>>>>>
>>>>

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
Hi Kurt,

Assuming I'm joining two tables, "latestListings" and "latestAgents" like
below:

"SELECT * FROM latestListings l " +
        "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
        "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
        "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +


In order to avoid joining on NULL keys, are you suggesting that I change
the query as below:

"SELECT * FROM latestListings l " +
        "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND
l.listAgentKeyL IS NOT NULL " +

        "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND
l.buyerAgentKeyL IS NOT NULL " +

        "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND
l.coListAgentKeyL IS NOT NULL" +


I tried this but noticed that it didn't work as the data skew (and
heavy load on one task) continued. Could you please let me know if I
missed anything?


Thanks,

Eva


On Sun, Jan 12, 2020 at 8:44 PM Kurt Young <yk...@gmail.com> wrote:

> Hi,
>
> You can try to filter NULL values with an explicit condition like "xxxx is
> not NULL".
>
> Best,
> Kurt
>
>
> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
> wrote:
>
>> Thank you both for the suggestions.
>> I did a bit more analysis using UI and identified at least one
>> problem that's occurring with the job rn. Going to fix it first and then
>> take it from there.
>>
>> *Problem that I identified:*
>> I'm running with 26 parallelism. For the checkpoints that are expiring,
>> one of a JOIN operation is finishing at 25/26 (96%) progress with
>> corresponding SubTask:21 has "n/a" value. For the same operation I also
>> noticed that the load is distributed poorly with heavy load being fed to
>> SubTask:21.
>> My guess is bunch of null values are happening for this JOIN operation
>> and being put into the same task.
>> Currently I'm using SQL query which gives me limited control on handling
>> null values so I'll try to programmatically JOIN and see if I can avoid
>> JOIN operation whenever the joining value is null. This should help with
>> better load distribution across subtasks. And may also fix expiring
>> checkpointing issue.
>>
>> Thanks for the guidance.
>> Eva.
>>
>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> For expired checkpoint, you can find something like " Checkpoint xxx of
>>> job xx expired before completing" in jobmanager.log, then you can go to the
>>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>>> what happened.
>>>
>>> If checkpoint was been declined, you can find something like "Decline
>>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>>> case, you can go to the task directly to find out why the checkpoint failed.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>>
>>>> Hi Eva
>>>>
>>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>>> why checkpoint failed, might be declined by some specific task.
>>>>
>>>> If checkpoint expired, you can also access the web UI to see which
>>>> tasks did not respond in time, some hot task might not be able to respond
>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>> the back pressure could help the checkpoint finished before timeout.
>>>>
>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>> pressure [2] could help you.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Eva Eva <et...@gmail.com>
>>>> *Sent:* Friday, January 10, 2020 10:29
>>>> *To:* user <us...@flink.apache.org>
>>>> *Subject:* Please suggest helpful tools
>>>>
>>>> Hi,
>>>>
>>>> I'm running Flink job on 1.9 version with blink planner.
>>>>
>>>> My checkpoints are timing out intermittently, but as state grows they
>>>> are timing out more and more often eventually killing the job.
>>>>
>>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>>
>>>> Although size is huge, I have enough space on EC2 instance in which I'm
>>>> running job. I'm using RocksDB for checkpointing.
>>>>
>>>> *Logs does not have any useful information to understand why
>>>> checkpoints are expiring/failing, can someone please point me to tools that
>>>> can be used to investigate and understand why checkpoints are failing.*
>>>>
>>>> Also any other related suggestions are welcome.
>>>>
>>>>
>>>> Thanks,
>>>> Reva.
>>>>
>>>

Re: Please suggest helpful tools

Posted by Kurt Young <yk...@gmail.com>.
Hi,

You can try to filter NULL values with an explicit condition like "xxxx is
not NULL".

Best,
Kurt


On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <et...@gmail.com>
wrote:

> Thank you both for the suggestions.
> I did a bit more analysis using UI and identified at least one
> problem that's occurring with the job rn. Going to fix it first and then
> take it from there.
>
> *Problem that I identified:*
> I'm running with 26 parallelism. For the checkpoints that are expiring,
> one of a JOIN operation is finishing at 25/26 (96%) progress with
> corresponding SubTask:21 has "n/a" value. For the same operation I also
> noticed that the load is distributed poorly with heavy load being fed to
> SubTask:21.
> My guess is bunch of null values are happening for this JOIN operation and
> being put into the same task.
> Currently I'm using SQL query which gives me limited control on handling
> null values so I'll try to programmatically JOIN and see if I can avoid
> JOIN operation whenever the joining value is null. This should help with
> better load distribution across subtasks. And may also fix expiring
> checkpointing issue.
>
> Thanks for the guidance.
> Eva.
>
> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi
>>
>> For expired checkpoint, you can find something like " Checkpoint xxx of
>> job xx expired before completing" in jobmanager.log, then you can go to the
>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>> what happened.
>>
>> If checkpoint was been declined, you can find something like "Decline
>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>> case, you can go to the task directly to find out why the checkpoint failed.
>>
>> Best,
>> Congxian
>>
>>
>> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>>
>>> Hi Eva
>>>
>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>> why checkpoint failed, might be declined by some specific task.
>>>
>>> If checkpoint expired, you can also access the web UI to see which tasks
>>> did not respond in time, some hot task might not be able to respond in
>>> time. Generally speaking, checkpoint expired is mostly caused by back
>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>> the back pressure could help the checkpoint finished before timeout.
>>>
>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>> pressure [2] could help you.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Eva Eva <et...@gmail.com>
>>> *Sent:* Friday, January 10, 2020 10:29
>>> *To:* user <us...@flink.apache.org>
>>> *Subject:* Please suggest helpful tools
>>>
>>> Hi,
>>>
>>> I'm running Flink job on 1.9 version with blink planner.
>>>
>>> My checkpoints are timing out intermittently, but as state grows they
>>> are timing out more and more often eventually killing the job.
>>>
>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>
>>> Although size is huge, I have enough space on EC2 instance in which I'm
>>> running job. I'm using RocksDB for checkpointing.
>>>
>>> *Logs does not have any useful information to understand why checkpoints
>>> are expiring/failing, can someone please point me to tools that can be used
>>> to investigate and understand why checkpoints are failing.*
>>>
>>> Also any other related suggestions are welcome.
>>>
>>>
>>> Thanks,
>>> Reva.
>>>
>>

Re: Please suggest helpful tools

Posted by Eva Eva <et...@gmail.com>.
Thank you both for the suggestions.
I did a bit more analysis using UI and identified at least one
problem that's occurring with the job rn. Going to fix it first and then
take it from there.

*Problem that I identified:*
I'm running with 26 parallelism. For the checkpoints that are expiring, one
of a JOIN operation is finishing at 25/26 (96%) progress with corresponding
SubTask:21 has "n/a" value. For the same operation I also noticed that the
load is distributed poorly with heavy load being fed to SubTask:21.
My guess is bunch of null values are happening for this JOIN operation and
being put into the same task.
Currently I'm using SQL query which gives me limited control on handling
null values so I'll try to programmatically JOIN and see if I can avoid
JOIN operation whenever the joining value is null. This should help with
better load distribution across subtasks. And may also fix expiring
checkpointing issue.

Thanks for the guidance.
Eva.

On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi
>
> For expired checkpoint, you can find something like " Checkpoint xxx of
> job xx expired before completing" in jobmanager.log, then you can go to the
> checkpoint UI to find which tasks did not ack, and go to these tasks to see
> what happened.
>
> If checkpoint was been declined, you can find something like "Decline
> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
> case, you can go to the task directly to find out why the checkpoint failed.
>
> Best,
> Congxian
>
>
> Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:
>
>> Hi Eva
>>
>> If checkpoint failed, please view the web UI or jobmanager log to see why
>> checkpoint failed, might be declined by some specific task.
>>
>> If checkpoint expired, you can also access the web UI to see which tasks
>> did not respond in time, some hot task might not be able to respond in
>> time. Generally speaking, checkpoint expired is mostly caused by back
>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>> the back pressure could help the checkpoint finished before timeout.
>>
>> I think the doc of monitoring web UI for checkpoint [1] and back pressure
>> [2] could help you.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Eva Eva <et...@gmail.com>
>> *Sent:* Friday, January 10, 2020 10:29
>> *To:* user <us...@flink.apache.org>
>> *Subject:* Please suggest helpful tools
>>
>> Hi,
>>
>> I'm running Flink job on 1.9 version with blink planner.
>>
>> My checkpoints are timing out intermittently, but as state grows they are
>> timing out more and more often eventually killing the job.
>>
>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
>> is accumulated due to prior failed ones), Average=8.44GB.
>>
>> Although size is huge, I have enough space on EC2 instance in which I'm
>> running job. I'm using RocksDB for checkpointing.
>>
>> *Logs does not have any useful information to understand why checkpoints
>> are expiring/failing, can someone please point me to tools that can be used
>> to investigate and understand why checkpoints are failing.*
>>
>> Also any other related suggestions are welcome.
>>
>>
>> Thanks,
>> Reva.
>>
>

Re: Please suggest helpful tools

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

For expired checkpoint, you can find something like " Checkpoint xxx of job
xx expired before completing" in jobmanager.log, then you can go to the
checkpoint UI to find which tasks did not ack, and go to these tasks to see
what happened.

If checkpoint was been declined, you can find something like "Decline
checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
case, you can go to the task directly to find out why the checkpoint failed.

Best,
Congxian


Yun Tang <my...@live.com> 于2020年1月10日周五 下午7:31写道:

> Hi Eva
>
> If checkpoint failed, please view the web UI or jobmanager log to see why
> checkpoint failed, might be declined by some specific task.
>
> If checkpoint expired, you can also access the web UI to see which tasks
> did not respond in time, some hot task might not be able to respond in
> time. Generally speaking, checkpoint expired is mostly caused by back
> pressure which led the checkpoint barrier did not arrive in time. Resolve
> the back pressure could help the checkpoint finished before timeout.
>
> I think the doc of monitoring web UI for checkpoint [1] and back pressure
> [2] could help you.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>
> Best
> Yun Tang
> ------------------------------
> *From:* Eva Eva <et...@gmail.com>
> *Sent:* Friday, January 10, 2020 10:29
> *To:* user <us...@flink.apache.org>
> *Subject:* Please suggest helpful tools
>
> Hi,
>
> I'm running Flink job on 1.9 version with blink planner.
>
> My checkpoints are timing out intermittently, but as state grows they are
> timing out more and more often eventually killing the job.
>
> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one
> is accumulated due to prior failed ones), Average=8.44GB.
>
> Although size is huge, I have enough space on EC2 instance in which I'm
> running job. I'm using RocksDB for checkpointing.
>
> *Logs does not have any useful information to understand why checkpoints
> are expiring/failing, can someone please point me to tools that can be used
> to investigate and understand why checkpoints are failing.*
>
> Also any other related suggestions are welcome.
>
>
> Thanks,
> Reva.
>

Re: Please suggest helpful tools

Posted by Yun Tang <my...@live.com>.
Hi Eva

If checkpoint failed, please view the web UI or jobmanager log to see why checkpoint failed, might be declined by some specific task.

If checkpoint expired, you can also access the web UI to see which tasks did not respond in time, some hot task might not be able to respond in time. Generally speaking, checkpoint expired is mostly caused by back pressure which led the checkpoint barrier did not arrive in time. Resolve the back pressure could help the checkpoint finished before timeout.

I think the doc of monitoring web UI for checkpoint [1] and back pressure [2] could help you.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html

Best
Yun Tang
________________________________
From: Eva Eva <et...@gmail.com>
Sent: Friday, January 10, 2020 10:29
To: user <us...@flink.apache.org>
Subject: Please suggest helpful tools

Hi,

I'm running Flink job on 1.9 version with blink planner.

My checkpoints are timing out intermittently, but as state grows they are timing out more and more often eventually killing the job.

Size of the state is large with Minimum=10.2MB and Maximum=49GB (this one is accumulated due to prior failed ones), Average=8.44GB.

Although size is huge, I have enough space on EC2 instance in which I'm running job. I'm using RocksDB for checkpointing.

Logs does not have any useful information to understand why checkpoints are expiring/failing, can someone please point me to tools that can be used to investigate and understand why checkpoints are failing.

Also any other related suggestions are welcome.


Thanks,
Reva.