You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dave Marion <dl...@comcast.net> on 2017/01/19 16:57:06 UTC

NPE in JobManager

I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue where after some period of time (measured in 1 - 3 hours) the JobManager gets an NPE and shuts itself down. The failure is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom accumulator[1], but can't tell from the JobManager code whether the issue is in my Accumulator, or is a bug in the JobManager.


[1] https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java

Re: Re: NPE in JobManager

Posted by Dave Marion <dl...@comcast.net>.
Fixing my accumulator did the trick. I should note that the JobManager did not fail when I ran this previously against Flink 1.1.3. Thanks for the help!

Dave


> On January 20, 2017 at 8:45 AM Dave Marion <dl...@comcast.net> wrote:
> 
>     I do see that message in one of the task manager logs 20ms before the NPE in the JobManager. Looking in that log, there is a ConcurrentModificationException in TreeMap, which my accumulator uses. I'll track this down, thanks for the pointer.
> 
> 
>         > > On January 20, 2017 at 8:27 AM Stephan Ewen <se...@apache.org> wrote:
> > 
> >         Hi!
> > 
> >         My current assumption is that there is an accumulator that cannot be serialized. The SortedStringAccumulator looks fine at a first glance, but are there other accumulators involved?
> >         Do you see a message like that one in the log of one of the TaskManagers
> > 
> >         "Failed to serialize accumulators for task."
> > 
> >         with an exception stack trace?
> > 
> > 
> >         Stephan
> > 
> > 
> > 
> >         On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > 
> >             > > >             Stephan,
> > > 
> > >             Thanks for looking at this. Could you elaborate on the misbehavior in the accumulator? I'd like to fix it if it's incorrect.
> > > 
> > >             Dave
> > > 
> > > 
> > > 
> > >                 > > > > On January 20, 2017 at 4:29 AM Stephan Ewen <sewen@apache.org mailto:sewen@apache.org > wrote:
> > > > 
> > > >                 Hi!
> > > > 
> > > >                 It seems that the accumulator behaves in a non-standard way, but the JobManager should also catch that (log a warning or debug message) and simply continue (not crash).
> > > > 
> > > >                 I'll try to add a patch that the JobManager tolerates these kinds of issues in the accumulators.
> > > > 
> > > >                 Stephan
> > > > 
> > > > 
> > > >                 On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > > > 
> > > >                     > > > > > 
> > > > >                     Noticed I didn't cc the user list.
> > > > > 
> > > > >                         > > > > -------- Original Message ----------
> > > >                         From: Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net >
> > > >                         To: Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com >
> > > >                         Date: January 19, 2017 at 12:13 PM
> > > >                         Subject: Re: NPE in JobManager
> > > > 
> > > > 
> > > >                         That might take some time. Here is a hand typed top N lines. If that is not enough let me know and I will start the process of getting the full stack trace.
> > > > 
> > > > 
> > > >                         NullPointerException
> > > > 
> > > >                         at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > > 
> > > >                         at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > > 
> > > >                         at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > > 
> > > >                         at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > > 
> > > >                         at org.apache.flink.runtime.jobmanager.JobManager.org http://org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > > 
> > > >                         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > > 
> > > >                         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > > 
> > > >                         at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> > > > 
> > > >                         at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > > 
> > > >                         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > > > 
> > > >                         at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > > > 
> > > >                         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > > 
> > > >                         at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> > > > 
> > > > 
> > > >                             > > > > > On January 19, 2017 at 11:58 AM Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com > wrote:
> > > > > 
> > > > >                             Can you pastebin the complete stack trace for the NPE ?
> > > > > 
> > > > >                             Thanks
> > > > > 
> > > > >                             On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > > > > 
> > > > >                                 > > > > > > 
> > > > > >                                 I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue where after some period of time (measured in 1 - 3 hours) the JobManager gets an NPE and shuts itself down. The failure is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom accumulator[1], but can't tell from the JobManager code whether the issue is in my Accumulator, or is a bug in the JobManager.
> > > > > > 
> > > > > > 
> > > > > >                                 [1] https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > > > > > 
> > > > > >                             > > > > > 
> > > > > 
> > > > >                         > > > > 
> > > >                     > > > 
> > >                 > > 
> > 
> >             >              
> 
>         

 

 

Re: Re: NPE in JobManager

Posted by Dave Marion <dl...@comcast.net>.
I do see that message in one of the task manager logs 20ms before the NPE in the JobManager. Looking in that log, there is a ConcurrentModificationException in TreeMap, which my accumulator uses. I'll track this down, thanks for the pointer.


> On January 20, 2017 at 8:27 AM Stephan Ewen <se...@apache.org> wrote:
> 
>     Hi!
> 
>     My current assumption is that there is an accumulator that cannot be serialized. The SortedStringAccumulator looks fine at a first glance, but are there other accumulators involved?
>     Do you see a message like that one in the log of one of the TaskManagers
> 
>     "Failed to serialize accumulators for task."
> 
>     with an exception stack trace?
> 
> 
>     Stephan
> 
> 
> 
>     On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> 
>         > >         Stephan,
> > 
> >         Thanks for looking at this. Could you elaborate on the misbehavior in the accumulator? I'd like to fix it if it's incorrect.
> > 
> >         Dave
> > 
> > 
> > 
> >             > > > On January 20, 2017 at 4:29 AM Stephan Ewen <sewen@apache.org mailto:sewen@apache.org > wrote:
> > > 
> > >             Hi!
> > > 
> > >             It seems that the accumulator behaves in a non-standard way, but the JobManager should also catch that (log a warning or debug message) and simply continue (not crash).
> > > 
> > >             I'll try to add a patch that the JobManager tolerates these kinds of issues in the accumulators.
> > > 
> > >             Stephan
> > > 
> > > 
> > >             On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > > 
> > >                 > > > > 
> > > >                 Noticed I didn't cc the user list.
> > > > 
> > > >                     > > > -------- Original Message ----------
> > >                     From: Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net >
> > >                     To: Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com >
> > >                     Date: January 19, 2017 at 12:13 PM
> > >                     Subject: Re: NPE in JobManager
> > > 
> > > 
> > >                     That might take some time. Here is a hand typed top N lines. If that is not enough let me know and I will start the process of getting the full stack trace.
> > > 
> > > 
> > >                     NullPointerException
> > > 
> > >                     at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > 
> > >                     at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > 
> > >                     at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > 
> > >                     at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > 
> > >                     at org.apache.flink.runtime.jobmanager.JobManager.org http://org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > 
> > >                     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > 
> > >                     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > >                     at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> > > 
> > >                     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > >                     at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > > 
> > >                     at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > > 
> > >                     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > 
> > >                     at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> > > 
> > > 
> > >                         > > > > On January 19, 2017 at 11:58 AM Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com > wrote:
> > > > 
> > > >                         Can you pastebin the complete stack trace for the NPE ?
> > > > 
> > > >                         Thanks
> > > > 
> > > >                         On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > > > 
> > > >                             > > > > > 
> > > > >                             I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue where after some period of time (measured in 1 - 3 hours) the JobManager gets an NPE and shuts itself down. The failure is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom accumulator[1], but can't tell from the JobManager code whether the issue is in my Accumulator, or is a bug in the JobManager.
> > > > > 
> > > > > 
> > > > >                             [1] https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > > > > 
> > > > >                         > > > > 
> > > > 
> > > >                     > > > 
> > >                 > > 
> >             > 
> 
>                  



 

Re: Re: NPE in JobManager

Posted by Stephan Ewen <se...@apache.org>.
Hi!

My current assumption is that there is an accumulator that cannot be
serialized. The SortedStringAccumulator looks fine at a first glance, but
are there other accumulators involved?
Do you see a message like that one in the log of one of the TaskManagers

"Failed to serialize accumulators for task."

with an exception stack trace?


Stephan



On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dl...@comcast.net> wrote:

> Stephan,
>
> Thanks for looking at this. Could you elaborate on the misbehavior in the
> accumulator? I'd like to fix it if it's incorrect.
>
> Dave
>
>
> On January 20, 2017 at 4:29 AM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> It seems that the accumulator behaves in a non-standard way, but the
> JobManager should also catch that (log a warning or debug message) and
> simply continue (not crash).
>
> I'll try to add a patch that the JobManager tolerates these kinds of
> issues in the accumulators.
>
> Stephan
>
>
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dl...@comcast.net> wrote:
>
>> Noticed I didn't cc the user list.
>>
>> ---------- Original Message ----------
>> From: Dave Marion <dl...@comcast.net>
>> To: Ted Yu <yu...@gmail.com>
>> Date: January 19, 2017 at 12:13 PM
>> Subject: Re: NPE in JobManager
>>
>> That might take some time. Here is a hand typed top N lines. If that is
>> not enough let me know and I will start the process of getting the full
>> stack trace.
>>
>>
>> NullPointerException
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>>
>> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB
>> uffer.scala:48)
>>
>> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>> flink$runtime$jobmanager$JobManager$$updateAccumulators
>> (JobManager.scala:1788)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:967)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun
>> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>
>>
>> On January 19, 2017 at 11:58 AM Ted Yu <yu...@gmail.com> wrote:
>>
>> Can you pastebin the complete stack trace for the NPE ?
>>
>> Thanks
>>
>> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dl...@comcast.net>
>> wrote:
>>
>> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
>> issue where after some period of time (measured in 1 - 3 hours) the
>> JobManager gets an NPE and shuts itself down. The failure is at
>> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
>> a custom accumulator[1], but can't tell from the JobManager code whether
>> the issue is in my Accumulator, or is a bug in the JobManager.
>>
>>
>> [1] https://github.com/NationalSecurityAgency/timely/blob/master
>> /analytics/src/main/java/timely/analytics/flink/SortedS
>> tringAccumulator.java
>>
>>
>>
>
>
>

Re: Re: NPE in JobManager

Posted by Dave Marion <dl...@comcast.net>.
Stephan,

Thanks for looking at this. Could you elaborate on the misbehavior in the accumulator? I'd like to fix it if it's incorrect.

Dave


> On January 20, 2017 at 4:29 AM Stephan Ewen <se...@apache.org> wrote:
> 
>     Hi!
> 
>     It seems that the accumulator behaves in a non-standard way, but the JobManager should also catch that (log a warning or debug message) and simply continue (not crash).
> 
>     I'll try to add a patch that the JobManager tolerates these kinds of issues in the accumulators.
> 
>     Stephan
> 
> 
>     On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> 
>         > > 
> >         Noticed I didn't cc the user list.
> > 
> >             > -------- Original Message ----------
>             From: Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net >
>             To: Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com >
>             Date: January 19, 2017 at 12:13 PM
>             Subject: Re: NPE in JobManager
> 
> 
>             That might take some time. Here is a hand typed top N lines. If that is not enough let me know and I will start the process of getting the full stack trace.
> 
> 
>             NullPointerException
> 
>             at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> 
>             at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> 
>             at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> 
>             at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> 
>             at org.apache.flink.runtime.jobmanager.JobManager.org http://org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> 
>             at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> 
>             at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
>             at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> 
>             at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
>             at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 
>             at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 
>             at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
>             at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> 
> 
>                 > > On January 19, 2017 at 11:58 AM Ted Yu <yuzhihong@gmail.com mailto:yuzhihong@gmail.com > wrote:
> > 
> >                 Can you pastebin the complete stack trace for the NPE ?
> > 
> >                 Thanks
> > 
> >                 On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:
> > 
> >                     > > > 
> > >                     I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue where after some period of time (measured in 1 - 3 hours) the JobManager gets an NPE and shuts itself down. The failure is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom accumulator[1], but can't tell from the JobManager code whether the issue is in my Accumulator, or is a bug in the JobManager.
> > > 
> > > 
> > >                     [1] https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > > 
> > >                 > > 
> > 
> >             > 
>         


 

Re: Re: NPE in JobManager

Posted by Stephan Ewen <se...@apache.org>.
I opened this issue: https://issues.apache.org/jira/browse/FLINK-5585

Assuming the bug is what I think it is (cannot be 100% sure from just the
small stack trace sample) it should be fixed soon...

On Fri, Jan 20, 2017 at 10:29 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> It seems that the accumulator behaves in a non-standard way, but the
> JobManager should also catch that (log a warning or debug message) and
> simply continue (not crash).
>
> I'll try to add a patch that the JobManager tolerates these kinds of
> issues in the accumulators.
>
> Stephan
>
>
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dl...@comcast.net> wrote:
>
>> Noticed I didn't cc the user list.
>>
>> ---------- Original Message ----------
>> From: Dave Marion <dl...@comcast.net>
>> To: Ted Yu <yu...@gmail.com>
>> Date: January 19, 2017 at 12:13 PM
>> Subject: Re: NPE in JobManager
>>
>> That might take some time. Here is a hand typed top N lines. If that is
>> not enough let me know and I will start the process of getting the full
>> stack trace.
>>
>>
>> NullPointerException
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>>
>> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB
>> uffer.scala:48)
>>
>> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>> flink$runtime$jobmanager$JobManager$$updateAccumulators
>> (JobManager.scala:1788)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:967)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun
>> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>
>>
>> On January 19, 2017 at 11:58 AM Ted Yu <yu...@gmail.com> wrote:
>>
>> Can you pastebin the complete stack trace for the NPE ?
>>
>> Thanks
>>
>> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dl...@comcast.net>
>> wrote:
>>
>> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
>> issue where after some period of time (measured in 1 - 3 hours) the
>> JobManager gets an NPE and shuts itself down. The failure is at
>> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
>> a custom accumulator[1], but can't tell from the JobManager code whether
>> the issue is in my Accumulator, or is a bug in the JobManager.
>>
>>
>> [1] https://github.com/NationalSecurityAgency/timely/blob/master
>> /analytics/src/main/java/timely/analytics/flink/SortedS
>> tringAccumulator.java
>>
>>
>>
>

Re: Re: NPE in JobManager

Posted by Stephan Ewen <se...@apache.org>.
Hi!

It seems that the accumulator behaves in a non-standard way, but the
JobManager should also catch that (log a warning or debug message) and
simply continue (not crash).

I'll try to add a patch that the JobManager tolerates these kinds of issues
in the accumulators.

Stephan


On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dl...@comcast.net> wrote:

> Noticed I didn't cc the user list.
>
> ---------- Original Message ----------
> From: Dave Marion <dl...@comcast.net>
> To: Ted Yu <yu...@gmail.com>
> Date: January 19, 2017 at 12:13 PM
> Subject: Re: NPE in JobManager
>
> That might take some time. Here is a hand typed top N lines. If that is
> not enough let me know and I will start the process of getting the full
> stack trace.
>
>
> NullPointerException
>
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>
> at scala.collection.mutable.ResizableArray$class.forEach(
> ArrayBuffer.scala:48)
>
> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.
> scala:1788)
>
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.
> applyOrEslse(LeaderSessionMessageFilter.scala:44)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at org.apache.flink.runtime.LogMesages$$anon$1.
> applyOrElse(LogMessages.scala:28)
>
>
> On January 19, 2017 at 11:58 AM Ted Yu <yu...@gmail.com> wrote:
>
> Can you pastebin the complete stack trace for the NPE ?
>
> Thanks
>
> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dl...@comcast.net> wrote:
>
> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
> issue where after some period of time (measured in 1 - 3 hours) the
> JobManager gets an NPE and shuts itself down. The failure is at
> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
> a custom accumulator[1], but can't tell from the JobManager code whether
> the issue is in my Accumulator, or is a bug in the JobManager.
>
>
> [1] https://github.com/NationalSecurityAgency/timely/blob/
> master/analytics/src/main/java/timely/analytics/flink/
> SortedStringAccumulator.java
>
>
>

Fwd: Re: NPE in JobManager

Posted by Dave Marion <dl...@comcast.net>.
Noticed I didn't cc the user list.

-------- Original Message ----------
From: Dave Marion <dl...@comcast.net>
To: Ted Yu <yu...@gmail.com>
Date: January 19, 2017 at 12:13 PM
Subject: Re: NPE in JobManager


That might take some time. Here is a hand typed top N lines. If that is not enough let me know and I will start the process of getting the full stack trace.


NullPointerException

at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)

at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)

at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)

at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)

at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)

at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)


On January 19, 2017 at 11:58 AM Ted Yu <yu...@gmail.com> wrote:

Can you pastebin the complete stack trace for the NPE ?

Thanks

On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmarion@comcast.net mailto:dlmarion@comcast.net > wrote:

> 
>             I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue where after some period of time (measured in 1 - 3 hours) the JobManager gets an NPE and shuts itself down. The failure is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom accumulator[1], but can't tell from the JobManager code whether the issue is in my Accumulator, or is a bug in the JobManager.
> 
> 
>             [1] https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> 
>         



Re: NPE in JobManager

Posted by Ted Yu <yu...@gmail.com>.
Can you pastebin the complete stack trace for the NPE ?

Thanks

On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dl...@comcast.net> wrote:

> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
> issue where after some period of time (measured in 1 - 3 hours) the
> JobManager gets an NPE and shuts itself down. The failure is at JobManager$$
> updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom
> accumulator[1], but can't tell from the JobManager code whether the issue
> is in my Accumulator, or is a bug in the JobManager.
>
>
> [1] https://github.com/NationalSecurityAgency/timely/
> blob/master/analytics/src/main/java/timely/analytics/
> flink/SortedStringAccumulator.java
>