You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@smartlab.ws> on 2017/09/29 13:11:22 UTC

ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Hi, I'm coming across these Exceptions while running a pretty simple flink job.

First one:
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my
case Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new
JSONDeserializationSchema(), consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)

These exceptions aren't really that informative per se and, from what I
see, the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting
watermarks correctly? I'm not even how I could reproduce this exceptions,
since it looks like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Aljoscha Krettek <al...@apache.org>.
I think I finally found the problem, there was also already another bug report for this: https://issues.apache.org/jira/browse/FLINK-7484

> On 12. Oct 2017, at 18:22, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> Hi Aljoscha, 
> 
> yes, just like you're guessing, without asynchronous checkpoints, there has been no crash so far.
> 
> Regards,
> Federico
> 
> 2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>>:
> Hi Federico,
> 
> I'm guessing the job is still working without asynchronous watermarks? I'm very eager to figure out what is actually going wrong with asynchronous checkpoints.
> 
> Best,
> Aljoscha
> 
> 
>> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>> 
>> As a followup:
>> 
>> the flink job has currently an uptime of almost 24 hours, with no checkpoint failed or restart whereas, with async snapshots, it would have already crashed 50 or so times.
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>>:
>> Thank you very much, Gordon.
>> 
>> I'll try to run the job without the asynchronous snapshots first thing.
>> 
>> As for the Event data type: it's a case class with 2 fields: a String ID and a composite case class (let's call it RealEvent) containing 3 fields of the following types: Information, which is a case class with String fields, Coordinates, a nested case class with 2 Double and InstantValues, with 3 Integers and a DateTime.This DateTime field in InstantValues is the one being evalued in the maxBy (via InstantValues and RealEvent compareTo implementations, because dot notation is not working in scala as of 1.3.2, FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was the reason in the first place I had to register the JodaDateTimeSerializer with Kryo.
>> 
>> Regards,
>> Federico
>> 
>> 
>> 
>> 
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>>:
>> Hi,
>> 
>> Thanks for the extra info, it was helpful (I’m not sure why your first logs didn’t have the full trace, though).
>> 
>> I spent some time digging through the error trace, and currently have some observations I would like to go through first:
>> 
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while trying to access the state and making a copy (via serialization) in the CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the reducing window function (i.e. the maxBy). The state type should be the same as the records in your `events` DataStream, which seems to be a Scala case class with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to copy that field ..
>> 
>> My current guess would perhaps be that the serializer internally used may have been incorrectly shared, which is probably why this exception happens randomly for you.
>> I recall that there were similar issues that occurred before due to the fact that some KryoSerializers aren't thread-safe and was incorrectly shared in Flink.
>> 
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running this job a bit more to see if the problem still occurs? This is mainly to eliminate my guess on whether or not there is some incorrect serializer usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case class looks like?
>> 
>> Also looping in Aljoscha and Stefan here, as they would probably have more insights in this.
>> 
>> Cheers,
>> Gordon
>> 
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws <ma...@smartlab.ws>) wrote:
>> 
>>> Hi Gordon,
>>> 
>>> I remembered that I had already seen this kind of exception once during the testing of the current job and fortunately I had the complete stacktrace still saved on my pc:
>>> 
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:279)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge <http://heap.copyonwritestatetable.ge/>t(CopyOnWriteStateTable.java:296)
>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
>>>         at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>         at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> 
>>> I don't know why now the stacktrace is getting output only for the first parts (handleWatermark and HeapReducingState).
>>> 
>>> So, it looks like something that has to do with the KryoSerializer. As a KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>>> 
>>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer])
>>> 
>>> I hope this could help.
>>> 
>>> Regards,
>>> Federico
>>> 
>>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>>:
>>> Hi Gordon,
>>> 
>>> I'm currently using Flink 1.3.2 in local mode.
>>> 
>>> If it's any help I realized from the log that the complete task which is failing is:
>>> 
>>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - latest_time -> (map_active_stream, map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.
>>> 
>>> val events = keyedStreamByID 
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("time").name("latest_time").uid("latest_time")
>>> 
>>> 
>>> val activeStream = events
>>>   //Serialization to JsValue
>>>   .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")
>>> 
>>> val historyStream = airtrafficEvents
>>>   //Serialization to JsValue
>>>   .map(event => event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")
>>> 
>>> 
>>> 
>>> Regards,
>>> Federico
>>> 
>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>>:
>>> Hi,
>>> 
>>> I’m looking into this. Could you let us know the Flink version in which the exceptions occurred?
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> 
>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws <ma...@smartlab.ws>) wrote:
>>> 
>>>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>>> First one:
>>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:   
>>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>> 
>>>> The second one:
>>>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>> 
>>>> 
>>>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>>> 
>>>> val stream = env.addSource(
>>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>>>     .setStartFromLatest()
>>>>     .assignTimestampsAndWatermarks(
>>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>>           element.instantValues.time.getMillis
>>>>       })
>>>> )
>>>> These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator:
>>>> 
>>>> val events = keyedStreamByID
>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>> 
>>>> What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly.
>>>> 
>>>> Thank you all,
>>>> Federico D'Ambrosio
>>> 
>>> 
>>> 
>>> --
>>> Federico D'Ambrosio
>>> 
>>> 
>>> 
>>> --
>>> Federico D'Ambrosio
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi Aljoscha,

yes, just like you're guessing, without asynchronous checkpoints, there has
been no crash so far.

Regards,
Federico

2017-10-12 18:08 GMT+02:00 Aljoscha Krettek <al...@apache.org>:

> Hi Federico,
>
> I'm guessing the job is still working without asynchronous watermarks? I'm
> very eager to figure out what is actually going wrong with asynchronous
> checkpoints.
>
> Best,
> Aljoscha
>
>
> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
> As a followup:
>
> the flink job has currently an uptime of almost 24 hours, with no
> checkpoint failed or restart whereas, with async snapshots, it would have
> already crashed 50 or so times.
>
> Regards,
> Federico
>
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws>:
>
>> Thank you very much, Gordon.
>>
>> I'll try to run the job without the asynchronous snapshots first thing.
>>
>> As for the Event data type: it's a case class with 2 fields: a String ID
>> and a composite case class (let's call it RealEvent) containing 3 fields of
>> the following types: Information, which is a case class with String fields,
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3
>> Integers and a DateTime.This DateTime field in InstantValues is the one
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo
>> implementations, because dot notation is not working in scala as of 1.3.2,
>> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that
>> was the reason in the first place I had to register the
>> JodaDateTimeSerializer with Kryo.
>>
>> Regards,
>> Federico
>>
>>
>>
>>
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>
>>> Hi,
>>>
>>> Thanks for the extra info, it was helpful (I’m not sure why your first
>>> logs didn’t have the full trace, though).
>>>
>>> I spent some time digging through the error trace, and currently have
>>> some observations I would like to go through first:
>>>
>>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>>> trying to access the state and making a copy (via serialization) in the
>>> CopyOnWriteStateTable.
>>> 2. The state that caused the exception seems to be the state of the
>>> reducing window function (i.e. the maxBy). The state type should be the
>>> same as the records in your `events` DataStream, which seems to be a Scala
>>> case class with some nested field that requires Kryo for serialization.
>>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>>> trying to copy that field ..
>>>
>>> My current guess would perhaps be that the serializer internally used
>>> may have been incorrectly shared, which is probably why this exception
>>> happens randomly for you.
>>> I recall that there were similar issues that occurred before due to the
>>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>>> shared in Flink.
>>>
>>> I may need some help from you to be able to look at this a bit more:
>>> - Is it possible that you disable asynchronous snapshots and try running
>>> this job a bit more to see if the problem still occurs? This is mainly to
>>> eliminate my guess on whether or not there is some incorrect serializer
>>> usage in the CopyOnWriteStateTable.
>>> - Could you let us know what your `events` DataStream records type case
>>> class looks like?
>>>
>>> Also looping in Aljoscha and Stefan here, as they would probably have
>>> more insights in this.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>>> federico.dambrosio@smartlab.ws) wrote:
>>>
>>> Hi Gordon,
>>>
>>> I remembered that I had already seen this kind of exception once during
>>> the testing of the current job and fortunately I had the complete
>>> stacktrace still saved on my pc:
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:15
>>> 7)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.copy(KryoSerializer.java:176)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:279)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:296)
>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.get(He
>>> apReducingState.java:68)
>>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>>> wOperator.onEventTime(WindowOperator.java:498)
>>>         at org.apache.flink.streaming.api.operators.HeapInternalTimerSe
>>> rvice.advanceWatermark(HeapInternalTimerService.java:275)
>>>         at org.apache.flink.streaming.api.operators.InternalTimeService
>>> Manager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>         at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.processWatermark(AbstractStreamOperator.java:946)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>>> ssor.java:286)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>>> ssor.java:289)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>>> arkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(S
>>> tatusWatermarkValve.java:173)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>>> arkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:188)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> I don't know why now the stacktrace is getting output only for the first
>>> parts (handleWatermark and HeapReducingState).
>>>
>>> So, it looks like something that has to do with the KryoSerializer. As a
>>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>>>
>>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
>>> classOf[JodaDateTimeSerializer])
>>>
>>> I hope this could help.
>>>
>>> Regards,
>>> Federico
>>>
>>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws>:
>>>
>>>> Hi Gordon,
>>>>
>>>> I'm currently using Flink 1.3.2 in local mode.
>>>>
>>>> If it's any help I realized from the log that the complete task which
>>>> is failing is:
>>>>
>>>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskm
>>>> anager.Task                     - latest_time -> (map_active_stream,
>>>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
>>>> from RUNNING to FAILED.
>>>>
>>>> val events = keyedStreamByID
>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>   .maxBy("time").name("latest_time").uid("latest_time")
>>>>
>>>>
>>>> val activeStream = events
>>>>   //Serialization to JsValue
>>>>   .map(event => event.toMongoActiveJsValue).na
>>>> me("map_active_stream").uid("map_active_stream")
>>>>   //Global windowing, the cause of exception should be above
>>>>   .timeWindowAll(Time.seconds(10))
>>>>   .apply(new MongoWindow(MongoWritingType.U
>>>> PDATE)).name("active_stream_window").uid("active_stream_window")
>>>>
>>>> val historyStream = airtrafficEvents
>>>>   //Serialization to JsValue
>>>>   .map(event => event.toMongoHistoryJsValue).n
>>>> ame("map_history_stream").uid("map_history_stream")
>>>>   //Global windowing, the cause of exception should be above
>>>>   .timeWindowAll(Time.seconds(10))
>>>>   .apply(new MongoWindow(MongoWritingType.U
>>>> PDATE)).name("history_stream_window").uid("history_stream_window")
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Federico
>>>>
>>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I’m looking into this. Could you let us know the Flink version in
>>>>> which the exceptions occurred?
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>>>>> federico.dambrosio@smartlab.ws) wrote:
>>>>>
>>>>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>>>>
>>>>> First one:
>>>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>> The second one:
>>>>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>>
>>>>>
>>>>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>>>>
>>>>> val stream = env.addSource(
>>>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>>>>     .setStartFromLatest()
>>>>>     .assignTimestampsAndWatermarks(
>>>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>>>           element.instantValues.time.getMillis
>>>>>       })
>>>>> )
>>>>>
>>>>> These exceptions aren't really that informative per se and, from what
>>>>> I see, the task triggering these exceptions is the following operator:
>>>>>
>>>>> val events = keyedStreamByID
>>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>>>
>>>>> What could be the problem here in your opinion? It's not emitting
>>>>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>>>>> since it looks like they happen pretty much randomly.
>>>>>
>>>>> Thank you all,
>>>>> Federico D'Ambrosio
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Federico,

I'm guessing the job is still working without asynchronous watermarks? I'm very eager to figure out what is actually going wrong with asynchronous checkpoints.

Best,
Aljoscha


> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> As a followup:
> 
> the flink job has currently an uptime of almost 24 hours, with no checkpoint failed or restart whereas, with async snapshots, it would have already crashed 50 or so times.
> 
> Regards,
> Federico
> 
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>>:
> Thank you very much, Gordon.
> 
> I'll try to run the job without the asynchronous snapshots first thing.
> 
> As for the Event data type: it's a case class with 2 fields: a String ID and a composite case class (let's call it RealEvent) containing 3 fields of the following types: Information, which is a case class with String fields, Coordinates, a nested case class with 2 Double and InstantValues, with 3 Integers and a DateTime.This DateTime field in InstantValues is the one being evalued in the maxBy (via InstantValues and RealEvent compareTo implementations, because dot notation is not working in scala as of 1.3.2, FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was the reason in the first place I had to register the JodaDateTimeSerializer with Kryo.
> 
> Regards,
> Federico
> 
> 
> 
> 
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>>:
> Hi,
> 
> Thanks for the extra info, it was helpful (I’m not sure why your first logs didn’t have the full trace, though).
> 
> I spent some time digging through the error trace, and currently have some observations I would like to go through first:
> 
> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while trying to access the state and making a copy (via serialization) in the CopyOnWriteStateTable.
> 2. The state that caused the exception seems to be the state of the reducing window function (i.e. the maxBy). The state type should be the same as the records in your `events` DataStream, which seems to be a Scala case class with some nested field that requires Kryo for serialization.
> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to copy that field ..
> 
> My current guess would perhaps be that the serializer internally used may have been incorrectly shared, which is probably why this exception happens randomly for you.
> I recall that there were similar issues that occurred before due to the fact that some KryoSerializers aren't thread-safe and was incorrectly shared in Flink.
> 
> I may need some help from you to be able to look at this a bit more:
> - Is it possible that you disable asynchronous snapshots and try running this job a bit more to see if the problem still occurs? This is mainly to eliminate my guess on whether or not there is some incorrect serializer usage in the CopyOnWriteStateTable.
> - Could you let us know what your `events` DataStream records type case class looks like?
> 
> Also looping in Aljoscha and Stefan here, as they would probably have more insights in this.
> 
> Cheers,
> Gordon
> 
> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws <ma...@smartlab.ws>) wrote:
> 
>> Hi Gordon,
>> 
>> I remembered that I had already seen this kind of exception once during the testing of the current job and fortunately I had the complete stacktrace still saved on my pc:
>> 
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>>         at org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
>>         at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>         at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> 
>> I don't know why now the stacktrace is getting output only for the first parts (handleWatermark and HeapReducingState).
>> 
>> So, it looks like something that has to do with the KryoSerializer. As a KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>> 
>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer])
>> 
>> I hope this could help.
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>>:
>> Hi Gordon,
>> 
>> I'm currently using Flink 1.3.2 in local mode.
>> 
>> If it's any help I realized from the log that the complete task which is failing is:
>> 
>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - latest_time -> (map_active_stream, map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.
>> 
>> val events = keyedStreamByID 
>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>   .maxBy("time").name("latest_time").uid("latest_time")
>> 
>> 
>> val activeStream = events
>>   //Serialization to JsValue
>>   .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")
>> 
>> val historyStream = airtrafficEvents
>>   //Serialization to JsValue
>>   .map(event => event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")
>> 
>> 
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>>:
>> Hi,
>> 
>> I’m looking into this. Could you let us know the Flink version in which the exceptions occurred?
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws <ma...@smartlab.ws>) wrote:
>> 
>>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>> First one:
>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:   
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>> 
>>> The second one:
>>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>> 
>>> 
>>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>> 
>>> val stream = env.addSource(
>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>>     .setStartFromLatest()
>>>     .assignTimestampsAndWatermarks(
>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>           element.instantValues.time.getMillis
>>>       })
>>> )
>>> These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator:
>>> 
>>> val events = keyedStreamByID
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>> 
>>> What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly.
>>> 
>>> Thank you all,
>>> Federico D'Ambrosio
>> 
>> 
>> 
>> --
>> Federico D'Ambrosio
>> 
>> 
>> 
>> --
>> Federico D'Ambrosio
> 
> 
> 
> -- 
> Federico D'Ambrosio
> 
> 
> 
> -- 
> Federico D'Ambrosio


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
As a followup:

the flink job has currently an uptime of almost 24 hours, with no
checkpoint failed or restart whereas, with async snapshots, it would have
already crashed 50 or so times.

Regards,
Federico

2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
federico.dambrosio@smartlab.ws>:

> Thank you very much, Gordon.
>
> I'll try to run the job without the asynchronous snapshots first thing.
>
> As for the Event data type: it's a case class with 2 fields: a String ID
> and a composite case class (let's call it RealEvent) containing 3 fields of
> the following types: Information, which is a case class with String fields,
> Coordinates, a nested case class with 2 Double and InstantValues, with 3
> Integers and a DateTime.This DateTime field in InstantValues is the one
> being evalued in the maxBy (via InstantValues and RealEvent compareTo
> implementations, because dot notation is not working in scala as of 1.3.2,
> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that
> was the reason in the first place I had to register the
> JodaDateTimeSerializer with Kryo.
>
> Regards,
> Federico
>
>
>
>
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>
>> Hi,
>>
>> Thanks for the extra info, it was helpful (I’m not sure why your first
>> logs didn’t have the full trace, though).
>>
>> I spent some time digging through the error trace, and currently have
>> some observations I would like to go through first:
>>
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>> trying to access the state and making a copy (via serialization) in the
>> CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the
>> reducing window function (i.e. the maxBy). The state type should be the
>> same as the records in your `events` DataStream, which seems to be a Scala
>> case class with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>> trying to copy that field ..
>>
>> My current guess would perhaps be that the serializer internally used may
>> have been incorrectly shared, which is probably why this exception happens
>> randomly for you.
>> I recall that there were similar issues that occurred before due to the
>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>> shared in Flink.
>>
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running
>> this job a bit more to see if the problem still occurs? This is mainly to
>> eliminate my guess on whether or not there is some incorrect serializer
>> usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case
>> class looks like?
>>
>> Also looping in Aljoscha and Stefan here, as they would probably have
>> more insights in this.
>>
>> Cheers,
>> Gordon
>>
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>> federico.dambrosio@smartlab.ws) wrote:
>>
>> Hi Gordon,
>>
>> I remembered that I had already seen this kind of exception once during
>> the testing of the current job and fortunately I had the complete
>> stacktrace still saved on my pc:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:176)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:279)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:296)
>>         at org.apache.flink.runtime.state.heap.HeapReducingState.get(
>> HeapReducingState.java:68)
>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.onEventTime(WindowOperator.java:498)
>>         at org.apache.flink.streaming.api.operators.HeapInternalTimerSe
>> rvice.advanceWatermark(HeapInternalTimerService.java:275)
>>         at org.apache.flink.streaming.api.operators.InternalTimeService
>> Manager.advanceWatermark(InternalTimeServiceManager.java:107)
>>         at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.processWatermark(AbstractStreamOperator.java:946)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>> ssor.java:286)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$F
>> orwardingValveOutputHandler.handleWatermark(StreamInputProce
>> ssor.java:289)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>> arkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(S
>> tatusWatermarkValve.java:173)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWaterm
>> arkValve.inputWatermark(StatusWatermarkValve.java:108)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:188)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> I don't know why now the stacktrace is getting output only for the first
>> parts (handleWatermark and HeapReducingState).
>>
>> So, it looks like something that has to do with the KryoSerializer. As a
>> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>>
>> env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
>> classOf[JodaDateTimeSerializer])
>>
>> I hope this could help.
>>
>> Regards,
>> Federico
>>
>> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws>:
>>
>>> Hi Gordon,
>>>
>>> I'm currently using Flink 1.3.2 in local mode.
>>>
>>> If it's any help I realized from the log that the complete task which is
>>> failing is:
>>>
>>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskm
>>> anager.Task                     - latest_time -> (map_active_stream,
>>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
>>> from RUNNING to FAILED.
>>>
>>> val events = keyedStreamByID
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("time").name("latest_time").uid("latest_time")
>>>
>>>
>>> val activeStream = events
>>>   //Serialization to JsValue
>>>   .map(event => event.toMongoActiveJsValue).na
>>> me("map_active_stream").uid("map_active_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new MongoWindow(MongoWritingType.U
>>> PDATE)).name("active_stream_window").uid("active_stream_window")
>>>
>>> val historyStream = airtrafficEvents
>>>   //Serialization to JsValue
>>>   .map(event => event.toMongoHistoryJsValue).n
>>> ame("map_history_stream").uid("map_history_stream")
>>>   //Global windowing, the cause of exception should be above
>>>   .timeWindowAll(Time.seconds(10))
>>>   .apply(new MongoWindow(MongoWritingType.U
>>> PDATE)).name("history_stream_window").uid("history_stream_window")
>>>
>>>
>>>
>>> Regards,
>>> Federico
>>>
>>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>>
>>>> Hi,
>>>>
>>>> I’m looking into this. Could you let us know the Flink version in which
>>>> the exceptions occurred?
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>>>> federico.dambrosio@smartlab.ws) wrote:
>>>>
>>>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>>>
>>>> First one:
>>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>> The second one:
>>>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>>
>>>>
>>>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>>>
>>>> val stream = env.addSource(
>>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>>>     .setStartFromLatest()
>>>>     .assignTimestampsAndWatermarks(
>>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>>           element.instantValues.time.getMillis
>>>>       })
>>>> )
>>>>
>>>> These exceptions aren't really that informative per se and, from what I
>>>> see, the task triggering these exceptions is the following operator:
>>>>
>>>> val events = keyedStreamByID
>>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>>
>>>> What could be the problem here in your opinion? It's not emitting
>>>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>>>> since it looks like they happen pretty much randomly.
>>>>
>>>> Thank you all,
>>>> Federico D'Ambrosio
>>>>
>>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>
>
> --
> Federico D'Ambrosio
>



-- 
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Thank you very much, Gordon.

I'll try to run the job without the asynchronous snapshots first thing.

As for the Event data type: it's a case class with 2 fields: a String ID
and a composite case class (let's call it RealEvent) containing 3 fields of
the following types: Information, which is a case class with String fields,
Coordinates, a nested case class with 2 Double and InstantValues, with 3
Integers and a DateTime.This DateTime field in InstantValues is the one
being evalued in the maxBy (via InstantValues and RealEvent compareTo
implementations, because dot notation is not working in scala as of 1.3.2,
FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was
the reason in the first place I had to register the JodaDateTimeSerializer
with Kryo.

Regards,
Federico




2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:

> Hi,
>
> Thanks for the extra info, it was helpful (I’m not sure why your first
> logs didn’t have the full trace, though).
>
> I spent some time digging through the error trace, and currently have some
> observations I would like to go through first:
>
> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
> trying to access the state and making a copy (via serialization) in the
> CopyOnWriteStateTable.
> 2. The state that caused the exception seems to be the state of the
> reducing window function (i.e. the maxBy). The state type should be the
> same as the records in your `events` DataStream, which seems to be a Scala
> case class with some nested field that requires Kryo for serialization.
> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying
> to copy that field ..
>
> My current guess would perhaps be that the serializer internally used may
> have been incorrectly shared, which is probably why this exception happens
> randomly for you.
> I recall that there were similar issues that occurred before due to the
> fact that some KryoSerializers aren't thread-safe and was incorrectly
> shared in Flink.
>
> I may need some help from you to be able to look at this a bit more:
> - Is it possible that you disable asynchronous snapshots and try running
> this job a bit more to see if the problem still occurs? This is mainly to
> eliminate my guess on whether or not there is some incorrect serializer
> usage in the CopyOnWriteStateTable.
> - Could you let us know what your `events` DataStream records type case
> class looks like?
>
> Also looping in Aljoscha and Stefan here, as they would probably have more
> insights in this.
>
> Cheers,
> Gordon
>
> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
> federico.dambrosio@smartlab.ws) wrote:
>
> Hi Gordon,
>
> I remembered that I had already seen this kind of exception once during
> the testing of the current job and fortunately I had the complete
> stacktrace still saved on my pc:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.copy(KryoSerializer.java:176)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:279)
>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:296)
>         at org.apache.flink.runtime.state.heap.HeapReducingState.
> get(HeapReducingState.java:68)
>         at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:498)
>         at org.apache.flink.streaming.api.operators.
> HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:
> 275)
>         at org.apache.flink.streaming.api.operators.
> InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(
> StreamInputProcessor.java:286)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(
> StreamInputProcessor.java:289)
>         at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:173)
>         at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:188)
>         at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:69)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
>
> I don't know why now the stacktrace is getting output only for the first
> parts (handleWatermark and HeapReducingState).
>
> So, it looks like something that has to do with the KryoSerializer. As a
> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:
>
> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[
> JodaDateTimeSerializer])
>
> I hope this could help.
>
> Regards,
> Federico
>
> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws>:
>
>> Hi Gordon,
>>
>> I'm currently using Flink 1.3.2 in local mode.
>>
>> If it's any help I realized from the log that the complete task which is
>> failing is:
>>
>> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskm
>> anager.Task                     - latest_time -> (map_active_stream,
>> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
>> from RUNNING to FAILED.
>>
>> val events = keyedStreamByID
>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>   .maxBy("time").name("latest_time").uid("latest_time")
>>
>>
>> val activeStream = events
>>   //Serialization to JsValue
>>   .map(event => event.toMongoActiveJsValue).na
>> me("map_active_stream").uid("map_active_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_wi
>> ndow").uid("active_stream_window")
>>
>> val historyStream = airtrafficEvents
>>   //Serialization to JsValue
>>   .map(event => event.toMongoHistoryJsValue).n
>> ame("map_history_stream").uid("map_history_stream")
>>   //Global windowing, the cause of exception should be above
>>   .timeWindowAll(Time.seconds(10))
>>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_w
>> indow").uid("history_stream_window")
>>
>>
>>
>> Regards,
>> Federico
>>
>> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>
>>> Hi,
>>>
>>> I’m looking into this. Could you let us know the Flink version in which
>>> the exceptions occurred?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>>> federico.dambrosio@smartlab.ws) wrote:
>>>
>>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>>
>>> First one:
>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>
>>> The second one:
>>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>>
>>>
>>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>>
>>> val stream = env.addSource(
>>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>>     .setStartFromLatest()
>>>     .assignTimestampsAndWatermarks(
>>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>>           element.instantValues.time.getMillis
>>>       })
>>> )
>>>
>>> These exceptions aren't really that informative per se and, from what I
>>> see, the task triggering these exceptions is the following operator:
>>>
>>> val events = keyedStreamByID
>>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>>
>>> What could be the problem here in your opinion? It's not emitting
>>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>>> since it looks like they happen pretty much randomly.
>>>
>>> Thank you all,
>>> Federico D'Ambrosio
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>
>
>
> --
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Thanks for the extra info, it was helpful (I’m not sure why your first logs didn’t have the full trace, though).

I spent some time digging through the error trace, and currently have some observations I would like to go through first:

1. So it seems like the ArrayIndexOutOfBoundsException was thrown while trying to access the state and making a copy (via serialization) in the CopyOnWriteStateTable.
2. The state that caused the exception seems to be the state of the reducing window function (i.e. the maxBy). The state type should be the same as the records in your `events` DataStream, which seems to be a Scala case class with some nested field that requires Kryo for serialization.
3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to copy that field ..

My current guess would perhaps be that the serializer internally used may have been incorrectly shared, which is probably why this exception happens randomly for you.
I recall that there were similar issues that occurred before due to the fact that some KryoSerializers aren't thread-safe and was incorrectly shared in Flink.

I may need some help from you to be able to look at this a bit more:
- Is it possible that you disable asynchronous snapshots and try running this job a bit more to see if the problem still occurs? This is mainly to eliminate my guess on whether or not there is some incorrect serializer usage in the CopyOnWriteStateTable.
- Could you let us know what your `events` DataStream records type case class looks like?

Also looping in Aljoscha and Stefan here, as they would probably have more insights in this.

Cheers,
Gordon

On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws) wrote:

Hi Gordon,

I remembered that I had already seen this kind of exception once during the testing of the current job and fortunately I had the complete stacktrace still saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
        at org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first parts (handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,
Federico

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <fe...@smartlab.ws>:
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is failing is:

2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task                     - latest_time -> (map_active_stream, map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events = keyedStreamByID 
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("time").name("latest_time").uid("latest_time")


val activeStream = events
  //Serialization to JsValue
  .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

val historyStream = airtrafficEvents
  //Serialization to JsValue
  .map(event => event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")



Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
Hi,

I’m looking into this. Could you let us know the Flink version in which the exceptions occurred?

Cheers,
Gordon


On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output watermark:   
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)
These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio



--
Federico D'Ambrosio



--
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi Gordon,

I remembered that I had already seen this kind of exception once during the
testing of the current job and fortunately I had the complete stacktrace
still saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
        at
org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
        at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first
parts (handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a
KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,
Federico

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
federico.dambrosio@smartlab.ws>:

> Hi Gordon,
>
> I'm currently using Flink 1.3.2 in local mode.
>
> If it's any help I realized from the log that the complete task which is
> failing is:
>
> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - latest_time -> (map_active_stream,
> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
> from RUNNING to FAILED.
>
> val events = keyedStreamByID
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .maxBy("time").name("latest_time").uid("latest_time")
>
>
> val activeStream = events
>   //Serialization to JsValue
>   .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid(
> "map_active_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_
> window").uid("active_stream_window")
>
> val historyStream = airtrafficEvents
>   //Serialization to JsValue
>   .map(event => event.toMongoHistoryJsValue).name("map_history_stream").
> uid("map_history_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_
> window").uid("history_stream_window")
>
>
>
> Regards,
> Federico
>
> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>
>> Hi,
>>
>> I’m looking into this. Could you let us know the Flink version in which
>> the exceptions occurred?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>> federico.dambrosio@smartlab.ws) wrote:
>>
>> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>>
>> First one:
>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>> The second one:
>> java.io.IOException: Exception while applying ReduceFunction in reducing state
>>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>>
>> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>>
>> val stream = env.addSource(
>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>>     .setStartFromLatest()
>>     .assignTimestampsAndWatermarks(
>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>           element.instantValues.time.getMillis
>>       })
>> )
>>
>> These exceptions aren't really that informative per se and, from what I
>> see, the task triggering these exceptions is the following operator:
>>
>> val events = keyedStreamByID
>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>
>> What could be the problem here in your opinion? It's not emitting
>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>> since it looks like they happen pretty much randomly.
>>
>> Thank you all,
>> Federico D'Ambrosio
>>
>>
>
>
> --
> Federico D'Ambrosio
>



-- 
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is
failing is:

2017-09-29 14:17:20,354 INFO
org.apache.flink.runtime.taskmanager.Task                     - latest_time
-> (map_active_stream, map_history_stream) (1/1)
(5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("time").name("latest_time").uid("latest_time")


val activeStream = events
  //Serialization to JsValue
  .map(event =>
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

val historyStream = airtrafficEvents
  //Serialization to JsValue
  .map(event =>
event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new
MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")



Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:

> Hi,
>
> I’m looking into this. Could you let us know the Flink version in which
> the exceptions occurred?
>
> Cheers,
> Gordon
>
>
> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
> federico.dambrosio@smartlab.ws) wrote:
>
> Hi, I'm coming across these Exceptions while running a pretty simple flink job.
>
> First one:
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
> The second one:
> java.io.IOException: Exception while applying ReduceFunction in reducing state
>         at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:
>
> val stream = env.addSource(
>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
>     .setStartFromLatest()
>     .assignTimestampsAndWatermarks(
>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>         def extractTimestamp(element: AirTrafficEvent): Long =
>           element.instantValues.time.getMillis
>       })
> )
>
> These exceptions aren't really that informative per se and, from what I
> see, the task triggering these exceptions is the following operator:
>
> val events = keyedStreamByID
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>
> What could be the problem here in your opinion? It's not emitting
> watermarks correctly? I'm not even how I could reproduce this exceptions,
> since it looks like they happen pretty much randomly.
>
> Thank you all,
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I’m looking into this. Could you let us know the Flink version in which the exceptions occurred?

Cheers,
Gordon

On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (federico.dambrosio@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output watermark:  
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
        at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig)
    .setStartFromLatest()
    .assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
        def extractTimestamp(element: AirTrafficEvent): Long =
          element.instantValues.time.getMillis
      })
)
These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio