You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Roberts <ar...@fuze.com> on 2019/03/07 16:51:57 UTC

Understanding timestamp and watermark assignment errors

Hello,

I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.

My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.

When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:

java.io.IOException: Exception while applying AggregateFunction in aggregating state
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
        at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
        at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
        ... 6 more

I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.

When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
        at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
        at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
        at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
        …


Any tips?


Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Re: Understanding timestamp and watermark assignment errors

Posted by Stefan Richter <s....@ververica.com>.
Hi,

I think this looks like the same problem as in this issue: https://issues.apache.org/jira/browse/FLINK-11420 <https://issues.apache.org/jira/browse/FLINK-11420>

Best,
Stefan


> On 13. Mar 2019, at 09:41, Konstantin Knauf <ko...@ververica.com> wrote:
> 
> Hi Andrew, 
> 
> generally, this looks like a concurrency problem. 
> 
> Are you using asynchronous checkpointing? If so, could you check if this issue also occurs with synchronous checkpointing. There have been reports recently, that there might be a problem with some Kryo types.
> 
> Can you set the logging level to DEBUG? We have some checks enabled in that case in the Kryo serializer to verify that the KryoSerializer is really concurrently accessed.
> 
> Are you using any Scala types, in particular collections or "Try"?
> 
> Cheers, 
> 
> Konstantin
> 
> On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <aroberts@fuze.com <ma...@fuze.com>> wrote:
> This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances. 
> 
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <konstantin@ververica.com <ma...@ververica.com>> wrote:
> 
>> Hi Andrew, 
>> 
>> which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836 <https://issues.apache.org/jira/browse/FLINK-8836>.
>> 
>> Cheers, 
>> 
>> Konstantin
>> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <aroberts@fuze.com <ma...@fuze.com>> wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in aggregating state
>>         at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>> 
>> 
>> Any tips?
>> 
>> 
>> Thanks,
>> 
>> Andrew
>> -- 
>> *Confidentiality Notice: The information contained in this e-mail and any
>> 
>> attachments may be confidential. If you are not an intended recipient, you
>> 
>> are hereby notified that any dissemination, distribution or copying of this
>> 
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> 
>> please notify the sender and permanently delete the e-mail and any
>> 
>> attachments immediately. You should not retain, copy or use this e-mail or
>> 
>> any attachment for any purpose, nor disclose all or any part of the
>> 
>> contents to any other person. Thank you.*
>> 
>> 
>> -- 
>> Konstantin Knauf | Solutions Architect
>> +49 160 91394525
>>  <https://www.ververica.com/>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
> 
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    


Re: Understanding timestamp and watermark assignment errors

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Andrew,

generally, this looks like a concurrency problem.

Are you using asynchronous checkpointing? If so, could you check if this
issue also occurs with synchronous checkpointing. There have been reports
recently, that there might be a problem with some Kryo types.

Can you set the logging level to DEBUG? We have some checks enabled in that
case in the Kryo serializer to verify that the KryoSerializer is really
concurrently accessed.

Are you using any Scala types, in particular collections or "Try"?

Cheers,

Konstantin

On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <ar...@fuze.com> wrote:

> This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more
> circumstances.
>
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <ko...@ververica.com>
> wrote:
>
> Hi Andrew,
>
> which Flink version do you use? This sounds a bit like
> https://issues.apache.org/jira/browse/FLINK-8836.
>
> Cheers,
>
> Konstantin
>
> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <ar...@fuze.com> wrote:
>
>> Hello,
>>
>> I’m trying to convert some of our larger stateful computations into
>> something that aligns more with the Flink windowing framework, and
>> particularly, start using “event time” instead of “ingest time” as a time
>> characteristics.
>>
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka
>> source), and while my data is generally time-ordered, there are some
>> upstream races, so I’m attempting to assign timestamps and watermarks using
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When
>> I assign timestamps directly in the Kafka sources (I’m also connecting two
>> Kafka streams here) using
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my
>> extractor has to do a bunch of “faking” because not every record that is
>> produced will have a valid timestamp - for example, a record that can’t be
>> parsed won’t.
>>
>> When I assign timestamps downstream, after filtering the stream down to
>> just records that are going to be windowed, I see errors in my Flink job:
>>
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>>         at
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>>
>> I am calling aggregate() on my windows, but otherwise I see very little
>> information that I can use to dig into this issue. Can anyone give me any
>> insight into what is going wrong here? I’d much prefer assigning timestamps
>> after filtering, rather than in the Kafka source, because I can filter down
>> to only records that I know will have timestamps.
>>
>> When experimenting with the lateness in my timestamp/watermark assigner,
>> I also saw a similarly opaque exception:
>>
>> java.lang.RuntimeException: Exception occurred while processing valve
>> output watermark:
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>>
>>
>> Any tips?
>>
>>
>> Thanks,
>>
>> Andrew
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>>
>> attachments may be confidential. If you are not an intended recipient, you
>>
>> are hereby notified that any dissemination, distribution or copying of
>> this
>>
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>
>> please notify the sender and permanently delete the e-mail and any
>>
>> attachments immediately. You should not retain, copy or use this e-mail or
>>
>> any attachment for any purpose, nor disclose all or any part of the
>>
>> contents to any other person. Thank you.*
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*



-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: Understanding timestamp and watermark assignment errors

Posted by Andrew Roberts <ar...@fuze.com>.
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more circumstances. 

> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <ko...@ververica.com> wrote:
> 
> Hi Andrew, 
> 
> which Flink version do you use? This sounds a bit like https://issues.apache.org/jira/browse/FLINK-8836.
> 
> Cheers, 
> 
> Konstantin
> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <ar...@fuze.com> wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into something that aligns more with the Flink windowing framework, and particularly, start using “event time” instead of “ingest time” as a time characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka source), and while my data is generally time-ordered, there are some upstream races, so I’m attempting to assign timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When I assign timestamps directly in the Kafka sources (I’m also connecting two Kafka streams here) using FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my extractor has to do a bunch of “faking” because not every record that is produced will have a valid timestamp - for example, a record that can’t be parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in aggregating state
>>         at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little information that I can use to dig into this issue. Can anyone give me any insight into what is going wrong here? I’d much prefer assigning timestamps after filtering, rather than in the Kafka source, because I can filter down to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>> 
>> 
>> Any tips?
>> 
>> 
>> Thanks,
>> 
>> Andrew
>> -- 
>> *Confidentiality Notice: The information contained in this e-mail and any
>> 
>> attachments may be confidential. If you are not an intended recipient, you
>> 
>> are hereby notified that any dissemination, distribution or copying of this
>> 
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> 
>> please notify the sender and permanently delete the e-mail and any
>> 
>> attachments immediately. You should not retain, copy or use this e-mail or
>> 
>> any attachment for any purpose, nor disclose all or any part of the
>> 
>> contents to any other person. Thank you.*
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> 
> 
> Follow us @VervericaData
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    

-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Re: Understanding timestamp and watermark assignment errors

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Andrew,

which Flink version do you use? This sounds a bit like
https://issues.apache.org/jira/browse/FLINK-8836.

Cheers,

Konstantin

On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <ar...@fuze.com> wrote:

> Hello,
>
> I’m trying to convert some of our larger stateful computations into
> something that aligns more with the Flink windowing framework, and
> particularly, start using “event time” instead of “ingest time” as a time
> characteristics.
>
> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka
> source), and while my data is generally time-ordered, there are some
> upstream races, so I’m attempting to assign timestamps and watermarks using
> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When
> I assign timestamps directly in the Kafka sources (I’m also connecting two
> Kafka streams here) using
> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my
> extractor has to do a bunch of “faking” because not every record that is
> produced will have a valid timestamp - for example, a record that can’t be
> parsed won’t.
>
> When I assign timestamps downstream, after filtering the stream down to
> just records that are going to be windowed, I see errors in my Flink job:
>
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
>         at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>         at scala.collection.immutable.List.foreach(List.scala:392)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>         at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>         at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>         ... 6 more
>
> I am calling aggregate() on my windows, but otherwise I see very little
> information that I can use to dig into this issue. Can anyone give me any
> insight into what is going wrong here? I’d much prefer assigning timestamps
> after filtering, rather than in the Kafka source, because I can filter down
> to only records that I know will have timestamps.
>
> When experimenting with the lateness in my timestamp/watermark assigner, I
> also saw a similarly opaque exception:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>         at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>         at
> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>         …
>
>
> Any tips?
>
>
> Thanks,
>
> Andrew
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen