You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dawid Wysakowicz <dw...@apache.org> on 2018/05/30 07:25:36 UTC

Re: NPE in flink sql over-window

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access
a list of elements that was already cleared and does not check against
null. Could you please file a JIRA for that?


Best,

Dawid


On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's
> printed by JVM and doesn't have timestamp. But I suspect that it's
> because so many failure recoveries from checkpoint and the sql queries
> are dynamically compiled too many times.
>
>
>
> /Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full.
> Compiler has been disabled./
> /Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code
> cache size using -XX:ReservedCodeCacheSize=/
> /CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb/
> /bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]/
> /total_blobs=54308 nmethods=53551 adapters=617/
> /compilation: disabled (not enough contiguous free space left)/
>
>
>
> ------------------------------------------------------------------------
> *From:* Yan Zhou [FDS Science] <yz...@coupang.com>
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>  
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from
> same line of code. 
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a
> sink. NPE throws from class ProcTimeBoundedRangeOver. Please see
> exception log at the bottom.
>
>
> The exceptions always happens after the application started
> for /maxIdleStateRetentionTime /time.  What could be the possible causes? 
>
>
> Best
>
> Yan
>
>
> /2018-05-27 11:03:37,656 INFO 
> org.apache.flink.runtime.taskmanager.Task                     - over:
> (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 86400000 PRECEDI/
> /NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: /
> /(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter
> -> Sink: Unnamed (3/15) (327/
> /efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED./
> /TimerException{java.lang.NullPointerException}/
> /       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)/
> /       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)/
> /       at java.util.concurrent.FutureTask.run(FutureTask.java:266)/
> /       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)/
> /       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)/
> /       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)/
> /       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)/
> /       at java.lang.Thread.run(Thread.java:748)/
> /Caused by: java.lang.NullPointerException/
> /       at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)/
> /       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)/
> /       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)/
> /       at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)/
> /       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)/
>
>
>


Re: NPE in flink sql over-window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yan,

Thanks for providing the logs and opening the JIRA issue!
Let's continue the discussion there.

Best, Fabian

2018-06-05 1:26 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>:

> Hi Fabian,
>
> I added some trace logs in ProcTimeBoundedRangeOver and think it should
> be a bug. The log should explain how cleanup_time_1 bypasses the needToCleanupState
> check and causes NPE. A jira ticket [1] is created.
>
> Best
> Yan
>
>
> *[ts:1528149296456] [label:state_ttl_update] register for cleanup at
> 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)*
> *[ts:1528149296456] [label:register_pt] register for process input at
> 1528149296457, because of Row:(orderId:001,userId:U123)*
> *[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, apply
> Row:(orderId:001,userId:U123) to accumulator*
>
> *[ts:1528149885813] [label:state_ttl_update] register at
> 1528150685813(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)*
> *[ts:1528149885813] [label:register_pt] register for process input at
> 1528149885814, because of Row:(orderId:002,userId:U123)*
> *[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, apply
> Row:(orderId:002,userId:U123) to accumulator*
>
> *[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at
> 1528150096456(CLEANUP_TIME_1), bypass needToCleanupState check, however
> rowMapState is {key:1528150096455, value:[]}*
>
> *[ts:1528150685815] [label:state_timeout] ontimer at
> 1528150685813(CLEANUP_TIME_2), clean/empty the rowMapState
> [{key:1528149885813, value:[Row:(orderId:002,userId:U123)]}]*
>
>
>
>
>
>
>
>
> [1] : https://issues.apache.org/jira/browse/FLINK-9524
>
>
> ------------------------------
> *From:* Yan Zhou [FDS Science] <yz...@coupang.com>
> *Sent:* Monday, June 4, 2018 4:05 PM
> *To:* Fabian Hueske
>
> *Cc:* Dawid Wysakowicz; user
> *Subject:* Re: NPE in flink sql over-window
>
>
> Hi Fabian,
>
>
> Yes, the NPE cause the job failure and recovery( instead of being the
> result of a recovery).
>
> And yet, during the recovery, the same exceptions are thrown from same
> line.
>
>
> Best
>
> Yan
> ------------------------------
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Thursday, May 31, 2018 12:09:03 AM
> *To:* Yan Zhou [FDS Science]
> *Cc:* Dawid Wysakowicz; user
> *Subject:* Re: NPE in flink sql over-window
>
> Hi Yan,
>
> Thanks for the details and for digging into the issue.
> If I got it right, the NPE caused the job failure and recovery (instead of
> being the result of a recovery), correct?
>
> Best, Fabian
>
> 2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>:
>
> Thanks for the replay.
>
>
> Yes, it only happen if I config the idle state retention times. The error
> occurs the first time before the first recovery. I haven't run with
> proctime but rowtime in flink 1.4.x. I am not sure if it will cause
> problems with proctime in 1.4.x.
>
>
> I am adding some trace log for ProcTimeBoundedRangeOver. I will update
> with my test result and fire a JIRA after that.
>
>
> Best
>
> Yan
> ------------------------------
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Wednesday, May 30, 2018 1:43:01 AM
> *To:* Dawid Wysakowicz
> *Cc:* user
> *Subject:* Re: NPE in flink sql over-window
>
> Hi,
>
> Dawid's analysis is certainly correct, but looking at the code this should
> not happen.
>
> I have a few questions:
> - You said this only happens if you configure idle state retention times,
> right?
> - Does the error occur the first time without a previous recovery?
> - Did you run the same query on Flink 1.4.x without any problems?
>
> Thanks, Fabian
>
> 2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>:
>
> Hi Yan,
>
>
> I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a
> list of elements that was already cleared and does not check against null.
> Could you please file a JIRA for that?
>
>
> Best,
>
> Dawid
>
> On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's printed
> by JVM and doesn't have timestamp. But I suspect that it's because so
> many failure recoveries from checkpoint and the sql queries are dynamically
> compiled too many times.
>
>
>
> *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler
> has been disabled.*
> *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache
> size using -XX:ReservedCodeCacheSize=*
> *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb*
> *bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]*
> *total_blobs=54308 nmethods=53551 adapters=617*
> *compilation: disabled (not enough contiguous free space left)*
>
>
>
> ------------------------------
> *From:* Yan Zhou [FDS Science] <yz...@coupang.com> <yz...@coupang.com>
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from same
> line of code.
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a sink.
> NPE throws from class ProcTimeBoundedRangeOver. Please see exception log
> at the bottom.
>
>
> The exceptions always happens after the application started for *maxIdleStateRetentionTime
> *time.  What could be the possible causes?
>
>
> Best
>
> Yan
>
>
> *2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - over: (PARTITION BY: uid, ORDER BY: proctime,
> RANGEBETWEEN 86400000 PRECEDI*
> *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: *
> *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter ->
> Sink: Unnamed (3/15) (327*
> *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.*
> *TimerException{java.lang.NullPointerException}*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)*
> *       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> *       at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)*
> *       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> *       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> *       at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.lang.NullPointerException*
> *       at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)*
> *       at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)*
>
>
>
>
>
>
>

Re: NPE in flink sql over-window

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Hi Fabian,

I added some trace logs in ProcTimeBoundedRangeOver and think it should be a bug. The log should explain how cleanup_time_1 bypasses the needToCleanupState check and causes NPE. A jira ticket [1] is created.

Best
Yan


[ts:1528149296456] [label:state_ttl_update] register for cleanup at 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)
[ts:1528149296456] [label:register_pt] register for process input at 1528149296457, because of Row:(orderId:001,userId:U123)
[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, apply Row:(orderId:001,userId:U123) to accumulator

[ts:1528149885813] [label:state_ttl_update] register at 1528150685813(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)
[ts:1528149885813] [label:register_pt] register for process input at 1528149885814, because of Row:(orderId:002,userId:U123)
[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, apply Row:(orderId:002,userId:U123) to accumulator

[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at 1528150096456(CLEANUP_TIME_1), bypass needToCleanupState check, however rowMapState is {key:1528150096455, value:[]}

[ts:1528150685815] [label:state_timeout] ontimer at 1528150685813(CLEANUP_TIME_2), clean/empty the rowMapState [{key:1528149885813, value:[Row:(orderId:002,userId:U123)]}]








[1] : https://issues.apache.org/jira/browse/FLINK-9524


________________________________
From: Yan Zhou [FDS Science] <yz...@coupang.com>
Sent: Monday, June 4, 2018 4:05 PM
To: Fabian Hueske
Cc: Dawid Wysakowicz; user
Subject: Re: NPE in flink sql over-window


Hi Fabian,


Yes, the NPE cause the job failure and recovery( instead of being the result of a recovery).

And yet, during the recovery, the same exceptions are thrown from same line.


Best

Yan

________________________________
From: Fabian Hueske <fh...@gmail.com>
Sent: Thursday, May 31, 2018 12:09:03 AM
To: Yan Zhou [FDS Science]
Cc: Dawid Wysakowicz; user
Subject: Re: NPE in flink sql over-window

Hi Yan,

Thanks for the details and for digging into the issue.
If I got it right, the NPE caused the job failure and recovery (instead of being the result of a recovery), correct?

Best, Fabian

2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>>:

Thanks for the replay.


Yes, it only happen if I config the idle state retention times. The error occurs the first time before the first recovery. I haven't run with proctime but rowtime in flink 1.4.x. I am not sure if it will cause problems with proctime in 1.4.x.


I am adding some trace log for ProcTimeBoundedRangeOver. I will update with my test result and fire a JIRA after that.


Best

Yan

________________________________
From: Fabian Hueske <fh...@gmail.com>>
Sent: Wednesday, May 30, 2018 1:43:01 AM
To: Dawid Wysakowicz
Cc: user
Subject: Re: NPE in flink sql over-window

Hi,

Dawid's analysis is certainly correct, but looking at the code this should not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times, right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>>:

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a list of elements that was already cleared and does not check against null. Could you please file a JIRA for that?


Best,

Dawid

On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:

I also get warnning that CodeCache is full around that time. It's printed by JVM and doesn't have timestamp. But I suspect that it's because so many failure recoveries from checkpoint and the sql queries are dynamically compiled too many times.



Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb
bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]
total_blobs=54308 nmethods=53551 adapters=617
compilation: disabled (not enough contiguous free space left)




________________________________
From: Yan Zhou [FDS Science] <yz...@coupang.com>
Sent: Tuesday, May 29, 2018 10:52:18 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: NPE in flink sql over-window


Hi,

I am using flink sql 1.5.0. My application throws NPE. And after it recover from checkpoint automatically, it throws NPE immediately from same line of code.


My application read message from kafka, convert the datastream into a table, issue an Over-window aggregation and write the result into a sink. NPE throws from class ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happens after the application started for maxIdleStateRetentionTime time.  What could be the possible causes?


Best

Yan


2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task                     - over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 86400000 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS w0$o0)) -> select:
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> Sink: Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.
TimerException{java.lang.NullPointerException}
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
       at org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)
       at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)







Re: NPE in flink sql over-window

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Hi Fabian,


Yes, the NPE cause the job failure and recovery( instead of being the result of a recovery).

And yet, during the recovery, the same exceptions are thrown from same line.


Best

Yan

________________________________
From: Fabian Hueske <fh...@gmail.com>
Sent: Thursday, May 31, 2018 12:09:03 AM
To: Yan Zhou [FDS Science]
Cc: Dawid Wysakowicz; user
Subject: Re: NPE in flink sql over-window

Hi Yan,

Thanks for the details and for digging into the issue.
If I got it right, the NPE caused the job failure and recovery (instead of being the result of a recovery), correct?

Best, Fabian

2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>>:

Thanks for the replay.


Yes, it only happen if I config the idle state retention times. The error occurs the first time before the first recovery. I haven't run with proctime but rowtime in flink 1.4.x. I am not sure if it will cause problems with proctime in 1.4.x.


I am adding some trace log for ProcTimeBoundedRangeOver. I will update with my test result and fire a JIRA after that.


Best

Yan

________________________________
From: Fabian Hueske <fh...@gmail.com>>
Sent: Wednesday, May 30, 2018 1:43:01 AM
To: Dawid Wysakowicz
Cc: user
Subject: Re: NPE in flink sql over-window

Hi,

Dawid's analysis is certainly correct, but looking at the code this should not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times, right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>>:

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a list of elements that was already cleared and does not check against null. Could you please file a JIRA for that?


Best,

Dawid

On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:

I also get warnning that CodeCache is full around that time. It's printed by JVM and doesn't have timestamp. But I suspect that it's because so many failure recoveries from checkpoint and the sql queries are dynamically compiled too many times.



Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb
bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]
total_blobs=54308 nmethods=53551 adapters=617
compilation: disabled (not enough contiguous free space left)




________________________________
From: Yan Zhou [FDS Science] <yz...@coupang.com>
Sent: Tuesday, May 29, 2018 10:52:18 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: NPE in flink sql over-window


Hi,

I am using flink sql 1.5.0. My application throws NPE. And after it recover from checkpoint automatically, it throws NPE immediately from same line of code.


My application read message from kafka, convert the datastream into a table, issue an Over-window aggregation and write the result into a sink. NPE throws from class ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happens after the application started for maxIdleStateRetentionTime time.  What could be the possible causes?


Best

Yan


2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task                     - over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 86400000 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS w0$o0)) -> select:
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> Sink: Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.
TimerException{java.lang.NullPointerException}
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
       at org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)
       at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)







Re: NPE in flink sql over-window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yan,

Thanks for the details and for digging into the issue.
If I got it right, the NPE caused the job failure and recovery (instead of
being the result of a recovery), correct?

Best, Fabian

2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>:

> Thanks for the replay.
>
>
> Yes, it only happen if I config the idle state retention times. The error
> occurs the first time before the first recovery. I haven't run with
> proctime but rowtime in flink 1.4.x. I am not sure if it will cause
> problems with proctime in 1.4.x.
>
>
> I am adding some trace log for ProcTimeBoundedRangeOver. I will update
> with my test result and fire a JIRA after that.
>
>
> Best
>
> Yan
> ------------------------------
> *From:* Fabian Hueske <fh...@gmail.com>
> *Sent:* Wednesday, May 30, 2018 1:43:01 AM
> *To:* Dawid Wysakowicz
> *Cc:* user
> *Subject:* Re: NPE in flink sql over-window
>
> Hi,
>
> Dawid's analysis is certainly correct, but looking at the code this should
> not happen.
>
> I have a few questions:
> - You said this only happens if you configure idle state retention times,
> right?
> - Does the error occur the first time without a previous recovery?
> - Did you run the same query on Flink 1.4.x without any problems?
>
> Thanks, Fabian
>
> 2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>:
>
> Hi Yan,
>
>
> I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a
> list of elements that was already cleared and does not check against null.
> Could you please file a JIRA for that?
>
>
> Best,
>
> Dawid
>
> On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's printed
> by JVM and doesn't have timestamp. But I suspect that it's because so
> many failure recoveries from checkpoint and the sql queries are dynamically
> compiled too many times.
>
>
>
> *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler
> has been disabled.*
> *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache
> size using -XX:ReservedCodeCacheSize=*
> *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb*
> *bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]*
> *total_blobs=54308 nmethods=53551 adapters=617*
> *compilation: disabled (not enough contiguous free space left)*
>
>
>
> ------------------------------
> *From:* Yan Zhou [FDS Science] <yz...@coupang.com> <yz...@coupang.com>
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from same
> line of code.
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a sink.
> NPE throws from class ProcTimeBoundedRangeOver. Please see exception log
> at the bottom.
>
>
> The exceptions always happens after the application started for *maxIdleStateRetentionTime
> *time.  What could be the possible causes?
>
>
> Best
>
> Yan
>
>
> *2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - over: (PARTITION BY: uid, ORDER BY: proctime,
> RANGEBETWEEN 86400000 PRECEDI*
> *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: *
> *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter ->
> Sink: Unnamed (3/15) (327*
> *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.*
> *TimerException{java.lang.NullPointerException}*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)*
> *       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> *       at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)*
> *       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> *       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> *       at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.lang.NullPointerException*
> *       at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)*
> *       at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)*
>
>
>
>
>
>

Re: NPE in flink sql over-window

Posted by "Yan Zhou [FDS Science]" <yz...@coupang.com>.
Thanks for the replay.


Yes, it only happen if I config the idle state retention times. The error occurs the first time before the first recovery. I haven't run with proctime but rowtime in flink 1.4.x. I am not sure if it will cause problems with proctime in 1.4.x.


I am adding some trace log for ProcTimeBoundedRangeOver. I will update with my test result and fire a JIRA after that.


Best

Yan

________________________________
From: Fabian Hueske <fh...@gmail.com>
Sent: Wednesday, May 30, 2018 1:43:01 AM
To: Dawid Wysakowicz
Cc: user
Subject: Re: NPE in flink sql over-window

Hi,

Dawid's analysis is certainly correct, but looking at the code this should not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times, right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>>:

Hi Yan,


I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a list of elements that was already cleared and does not check against null. Could you please file a JIRA for that?


Best,

Dawid

On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:

I also get warnning that CodeCache is full around that time. It's printed by JVM and doesn't have timestamp. But I suspect that it's because so many failure recoveries from checkpoint and the sql queries are dynamically compiled too many times.



Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb
bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]
total_blobs=54308 nmethods=53551 adapters=617
compilation: disabled (not enough contiguous free space left)




________________________________
From: Yan Zhou [FDS Science] <yz...@coupang.com>
Sent: Tuesday, May 29, 2018 10:52:18 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: NPE in flink sql over-window


Hi,

I am using flink sql 1.5.0. My application throws NPE. And after it recover from checkpoint automatically, it throws NPE immediately from same line of code.


My application read message from kafka, convert the datastream into a table, issue an Over-window aggregation and write the result into a sink. NPE throws from class ProcTimeBoundedRangeOver. Please see exception log at the bottom.


The exceptions always happens after the application started for maxIdleStateRetentionTime time.  What could be the possible causes?


Best

Yan


2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task                     - over: (PARTITION BY: uid, ORDER BY: proctime, RANGEBETWEEN 86400000 PRECEDI
NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS w0$o0)) -> select:
(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> Sink: Unnamed (3/15) (327
efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.
TimerException{java.lang.NullPointerException}
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
       at org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
       at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)
       at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)
       at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)






Re: NPE in flink sql over-window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Dawid's analysis is certainly correct, but looking at the code this should
not happen.

I have a few questions:
- You said this only happens if you configure idle state retention times,
right?
- Does the error occur the first time without a previous recovery?
- Did you run the same query on Flink 1.4.x without any problems?

Thanks, Fabian

2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dw...@apache.org>:

> Hi Yan,
>
>
> I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a
> list of elements that was already cleared and does not check against null.
> Could you please file a JIRA for that?
>
>
> Best,
>
> Dawid
>
> On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's printed
> by JVM and doesn't have timestamp. But I suspect that it's because so
> many failure recoveries from checkpoint and the sql queries are dynamically
> compiled too many times.
>
>
>
> *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler
> has been disabled.*
> *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache
> size using -XX:ReservedCodeCacheSize=*
> *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb*
> *bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]*
> *total_blobs=54308 nmethods=53551 adapters=617*
> *compilation: disabled (not enough contiguous free space left)*
>
>
>
> ------------------------------
> *From:* Yan Zhou [FDS Science] <yz...@coupang.com> <yz...@coupang.com>
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from same
> line of code.
>
>
> My application read message from kafka, convert the datastream into a
> table, issue an Over-window aggregation and write the result into a sink.
> NPE throws from class ProcTimeBoundedRangeOver. Please see exception log
> at the bottom.
>
>
> The exceptions always happens after the application started for *maxIdleStateRetentionTime
> *time.  What could be the possible causes?
>
>
> Best
>
> Yan
>
>
> *2018-05-27 11:03:37,656 INFO  org.apache.flink.runtime.taskmanager.Task
>                    - over: (PARTITION BY: uid, ORDER BY: proctime,
> RANGEBETWEEN 86400000 PRECEDI*
> *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS
> w0$o0)) -> select: *
> *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter ->
> Sink: Unnamed (3/15) (327*
> *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.*
> *TimerException{java.lang.NullPointerException}*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)*
> *       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> *       at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)*
> *       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)*
> *       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> *       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> *       at java.lang.Thread.run(Thread.java:748)*
> *Caused by: java.lang.NullPointerException*
> *       at
> org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)*
> *       at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)*
> *       at
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)*
> *       at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)*
>
>
>
>
>