You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Daniel Vol <vo...@gmail.com> on 2021/08/11 12:46:32 UTC

Odd Serialization exception

I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8)] INFO
o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io
.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics
to Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props:
Properties)
  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {

  @transient private var sessionsProcessed: Counter = _
  @transient private var sessionsWritten: Counter = _
  @transient private var sessionWriteFailed: Counter = _

  @transient private var sessionDuration: Long = 0
  @transient private var sessionSize: Int = 0

    override def open(parameters: Configuration): Unit = {
    val metrics = getRuntimeContext.getMetricGroup
    sessionsProcessed = metrics.counter("sessionsProcessed")
    sessionsWritten = metrics.counter("sessionsWritten")
    sessionWriteFailed = metrics.counter("sessionWriteFailed")
    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration",
ScalaGauge[Long](() => sessionDuration))
    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize",
ScalaGauge[Int](() => sessionSize))
  }

  @throws[Exception]
  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
      val session = value.asInstanceOf[Session]
      sessionDuration = session.getSessionDuration
      sessionSize = session.getSessionTotalEvents
      super.invoke(value, context)      sessionsWritten.inc()    }
    catch {
      case e: IllegalArgumentException if e.getMessage.contains("Data
must be less than or equal to") =>
        logger.error("failed session ended = " + value, e)
        sessionWriteFailed.inc()
      case _ : Throwbale => sessionWriteFailed.inc()
    } finally sessionsProcessed.inc()
  }
}

Anyone is familiar and can point out what may cause this exception and
how should I solve it?

Thanks!

Re: Odd Serialization exception

Posted by Guowei Ma <gu...@gmail.com>.
Hi,
I think you might be right. So you could try to call the super.open(...) in
your LogSessionProducer.
Best,
Guowei


On Thu, Aug 12, 2021 at 2:01 PM Daniel Vol <vo...@gmail.com> wrote:

> Hi Guowei,
>
> I am running on EMR 5.32.0 with Flink 1.11.2
>
> In meanwhile I did some tests and commented out part of the new code -
>
> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>     try {
> //      val session = value.asInstanceOf[Session]
> //      sessionDuration = 17L //session.getSessionDuration
> //      sessionSize = 19 //session.getSessionTotalEvents
>       super.invoke(value, context)
>       sessionsWritten.inc()
>     }
>
> Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
> null
> So, my assumption is that something wrong with "override def open()" method
>
> Thanks!
>
> On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <gu...@gmail.com> wrote:
>
>> Hi, Daniel
>> Could you tell me the version of Flink you use? I want to look at the
>> corresponding code.
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vo...@gmail.com> wrote:
>>
>>> Hi Matthias,
>>>
>>> First, thanks for a fast reply.
>>> I am new to Flink, so probably I miss a lot in terms of flow and objects
>>> passed.
>>>
>>> The motivation is to get internal data from the transferred OUT Object
>>> to send metrics. So I do downscale it but as per my perspective it is not
>>> forwarded (super called with original value) or expected to be used in
>>> later steps (this expected to be a local scope variable)
>>> As I am suspect that you are right - can you point me to how can I get
>>> internal data from OUT without changing it or affecting next steps.
>>> As well - when I create the object - I specify OUT type (which is
>>> Session):
>>>
>>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new KinesisEventSerializer[Session], producerConfig)
>>>
>>> "… but of course I might be completely be mistaken due to incomplete
>>> information."
>>> What kind of information can I supply?
>>>
>>> Thanks a lot!
>>>
>>> Daniel
>>>
>>> On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>
>>> wrote:
>>>
>>> 
>>>
>>> Hi Daniel,
>>>
>>>
>>>
>>> On the first look there is one thing that catches my eye:
>>>
>>> In line ‘val session = value.asInstanceOf[Session]' it looks like you
>>> are downcasting the event from OUT to Session.
>>>
>>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a
>>> specific serializer[OUT] to transport events from one operator to the next
>>> (or at least from one task to the next, if configured this way).
>>>
>>> These serializers usually only understand one type, OUT in your case.
>>> Only in certain circumstances the java object (the event) is transported
>>> directly from one operator to the next.
>>>
>>>
>>>
>>> I guess this is what happened, you serializer that only understands OUT
>>> can not cope with a Session object …
>>>
>>>
>>>
>>> … but of course I might be completely be mistaken due to incomplete
>>> information.
>>>
>>>
>>>
>>> I hope this helps 😊
>>>
>>>
>>>
>>> Feel free to get back to me for clarifications (on the mailing list)
>>>
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Daniel Vol <vo...@gmail.com>
>>> *Sent:* Mittwoch, 11. August 2021 14:47
>>> *To:* user@flink.apache.org
>>> *Subject:* Odd Serialization exception
>>>
>>>
>>>
>>> I started to get the following exception:
>>>
>>>
>>>
>>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8)] INFO
>>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
>>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>>>     at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>>>     at org.apache.flink.streaming.runtime.io
>>> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>>>     at org.apache.flink.streaming.runtime.io
>>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>>>     at org.apache.flink.streaming.runtime.io
>>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>>     at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>>>     at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>>>     at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>>>     at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>>>     at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>>     at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>>>     ... 23 common frames omitted
>>>
>>>
>>>
>>> The only change from version that has no this issue is adding some
>>> metrics to Producer (marked in red):
>>>
>>> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)
>>>
>>>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>>>
>>>
>>>
>>>   @transient private var sessionsProcessed: Counter = _
>>>
>>>   @transient private var sessionsWritten: Counter = _
>>>
>>>   @transient private var sessionWriteFailed: Counter = _
>>>
>>>
>>>
>>>   @transient private var sessionDuration: Long = 0
>>>
>>>   @transient private var sessionSize: Int = 0
>>>
>>>
>>>
>>>     override def open(parameters: Configuration): Unit = {
>>>
>>>     val metrics = getRuntimeContext.getMetricGroup
>>>
>>>     sessionsProcessed = metrics.counter("sessionsProcessed")
>>>
>>>     sessionsWritten = metrics.counter("sessionsWritten")
>>>
>>>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>>>
>>>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))
>>>
>>>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))
>>>
>>>   }
>>>
>>>
>>>
>>>   @throws[Exception]
>>>
>>>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>>>
>>>     try {
>>>
>>>       val session = value.asInstanceOf[Session]
>>>
>>>       sessionDuration = session.getSessionDuration
>>>
>>>       sessionSize = session.getSessionTotalEvents
>>>
>>>       super.invoke(value, context)
>>>
>>>       sessionsWritten.inc()
>>>
>>>     }
>>>
>>>     catch {
>>>
>>>       case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>
>>>
>>>         logger.error("failed session ended = " + value, e)
>>>
>>>         sessionWriteFailed.inc()
>>>
>>>       case _ : Throwbale => sessionWriteFailed.inc()
>>>
>>>     } finally sessionsProcessed.inc()
>>>
>>>   }
>>>
>>> }
>>>
>>> Anyone is familiar and can point out what may cause this exception and how should I solve it?
>>>
>>> Thanks!
>>>
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>>

Re: Odd Serialization exception

Posted by Daniel Vol <vo...@gmail.com>.
Thank you both!

Looks much better and hopefully works!

Daniel.

On Thu, Aug 12, 2021 at 10:35 AM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

> Good morning Daniel,
>
>
>
> … so my guess was not the cause of your problem 😊, anyway it seems like
> you always want to use your LogSessionProducer with Session?
>
> In that case you could drop the generics from the class like this:
>
>
>
> class LogSessionProducer(schema: SerializationSchema[Session], props:
> Properties)
>
>   extends FlinkKinesisProducer[Session](schema, props) with LazyLogging {
>
>   ...
>
>   override def invoke(value: Session, context: SinkFunction.Context[_]):
> Unit = {
>
>   ...
>
>
>
> As to your assumption that the problems could be in your override def
> open() …
>
> … I don’t see you invoke the super.open(…) function which would leave the
> producer only half initialized
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vo...@gmail.com>
> *Sent:* Donnerstag, 12. August 2021 08:01
> *To:* Guowei Ma <gu...@gmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Odd Serialization exception
>
>
>
> Hi Guowei,
>
>
>
> I am running on EMR 5.32.0 with Flink 1.11.2
>
>
>
> In meanwhile I did some tests and commented out part of the new code -
>
> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>     try {
>
>
>
> *//      val session = value.asInstanceOf[Session]//      sessionDuration = 17L //session.getSessionDuration//      sessionSize = 19 //session.getSessionTotalEvents      *super.invoke(value, context)
>       *sessionsWritten*.inc()
>     }
>
> Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
> null
>
> So, my assumption is that something wrong with "override def open()" method
>
>
>
> Thanks!
>
>
>
> On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <gu...@gmail.com> wrote:
>
> Hi, Daniel
>
> Could you tell me the version of Flink you use? I want to look at the
> corresponding code.
>
> Best,
>
> Guowei
>
>
>
>
>
> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vo...@gmail.com> wrote:
>
> Hi Matthias,
>
>
>
> First, thanks for a fast reply.
> I am new to Flink, so probably I miss a lot in terms of flow and objects
> passed.
>
>
>
> The motivation is to get internal data from the transferred OUT Object to
> send metrics. So I do downscale it but as per my perspective it is not
> forwarded (super called with original value) or expected to be used in
> later steps (this expected to be a local scope variable)
>
> As I am suspect that you are right - can you point me to how can I get
> internal data from OUT without changing it or affecting next steps.
>
> As well - when I create the object - I specify OUT type (which is Session):
>
> val flinkKinesisProducer = new LogSessionProducer[*Session*](new KinesisEventSerializer[Session], producerConfig)
>
> "… but of course I might be completely be mistaken due to incomplete
> information."
>
> What kind of information can I supply?
>
>
>
> Thanks a lot!
>
>
>
> Daniel
>
>
>
> On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>
> wrote:
>
> 
>
> Hi Daniel,
>
>
>
> On the first look there is one thing that catches my eye:
>
> In line ‘val session = value.asInstanceOf[Session]' it looks like you are
> downcasting the event from OUT to Session.
>
> In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
> serializer[OUT] to transport events from one operator to the next (or at
> least from one task to the next, if configured this way).
>
> These serializers usually only understand one type, OUT in your case. Only
> in certain circumstances the java object (the event) is transported
> directly from one operator to the next.
>
>
>
> I guess this is what happened, you serializer that only understands OUT
> can not cope with a Session object …
>
>
>
> … but of course I might be completely be mistaken due to incomplete
> information.
>
>
>
> I hope this helps 😊
>
>
>
> Feel free to get back to me for clarifications (on the mailing list)
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vo...@gmail.com>
> *Sent:* Mittwoch, 11. August 2021 14:47
> *To:* user@flink.apache.org
> *Subject:* Odd Serialization exception
>
>
>
> I started to get the following exception:
>
>
>
> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8)] INFO
> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: null
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>     ... 23 common frames omitted
>
>
>
> The only change from version that has no this issue is adding some metrics
> to Producer (marked in red):
>
> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)
>
>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>
>
>
>   @transient private var sessionsProcessed: Counter = _
>
>   @transient private var sessionsWritten: Counter = _
>
>   @transient private var sessionWriteFailed: Counter = _
>
>
>
>   @transient private var sessionDuration: Long = 0
>
>   @transient private var sessionSize: Int = 0
>
>
>
>     override def open(parameters: Configuration): Unit = {
>
>     val metrics = getRuntimeContext.getMetricGroup
>
>     sessionsProcessed = metrics.counter("sessionsProcessed")
>
>     sessionsWritten = metrics.counter("sessionsWritten")
>
>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>
>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))
>
>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))
>
>   }
>
>
>
>   @throws[Exception]
>
>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>
>     try {
>
>       val session = value.asInstanceOf[Session]
>
>       sessionDuration = session.getSessionDuration
>
>       sessionSize = session.getSessionTotalEvents
>
>       super.invoke(value, context)
>
>       sessionsWritten.inc()
>
>     }
>
>     catch {
>
>       case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>
>
>         logger.error("failed session ended = " + value, e)
>
>         sessionWriteFailed.inc()
>
>       case _ : Throwbale => sessionWriteFailed.inc()
>
>     } finally sessionsProcessed.inc()
>
>   }
>
> }
>
> Anyone is familiar and can point out what may cause this exception and how should I solve it?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: Odd Serialization exception

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Good morning Daniel,

… so my guess was not the cause of your problem 😊, anyway it seems like you always want to use your LogSessionProducer with Session?
In that case you could drop the generics from the class like this:

class LogSessionProducer(schema: SerializationSchema[Session], props: Properties)
  extends FlinkKinesisProducer[Session](schema, props) with LazyLogging {
  ...
  override def invoke(value: Session, context: SinkFunction.Context[_]): Unit = {
  ...

As to your assumption that the problems could be in your override def open() …
… I don’t see you invoke the super.open(…) function which would leave the producer only half initialized

Thias




From: Daniel Vol <vo...@gmail.com>
Sent: Donnerstag, 12. August 2021 08:01
To: Guowei Ma <gu...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Odd Serialization exception

Hi Guowei,

I am running on EMR 5.32.0 with Flink 1.11.2

In meanwhile I did some tests and commented out part of the new code -

override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
//      val session = value.asInstanceOf[Session]
//      sessionDuration = 17L //session.getSessionDuration
//      sessionSize = 19 //session.getSessionTotalEvents
      super.invoke(value, context)
      sessionsWritten.inc()
    }
Though I still get Caused by: org.apache.flink.util.SerializedThrowable: null
So, my assumption is that something wrong with "override def open()" method

Thanks!

On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <gu...@gmail.com>> wrote:
Hi, Daniel
Could you tell me the version of Flink you use? I want to look at the corresponding code.
Best,
Guowei


On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vo...@gmail.com>> wrote:
Hi Matthias,

First, thanks for a fast reply.
I am new to Flink, so probably I miss a lot in terms of flow and objects passed.

The motivation is to get internal data from the transferred OUT Object to send metrics. So I do downscale it but as per my perspective it is not forwarded (super called with original value) or expected to be used in later steps (this expected to be a local scope variable)
As I am suspect that you are right - can you point me to how can I get internal data from OUT without changing it or affecting next steps.
As well - when I create the object - I specify OUT type (which is Session):

val flinkKinesisProducer = new LogSessionProducer[Session](new KinesisEventSerializer[Session], producerConfig)
"… but of course I might be completely be mistaken due to incomplete information."
What kind of information can I supply?

Thanks a lot!

Daniel


On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>> wrote:

Hi Daniel,

On the first look there is one thing that catches my eye:
In line ‘val session = value.asInstanceOf[Session]' it looks like you are downcasting the event from OUT to Session.
In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific serializer[OUT] to transport events from one operator to the next (or at least from one task to the next, if configured this way).
These serializers usually only understand one type, OUT in your case. Only in certain circumstances the java object (the event) is transported directly from one operator to the next.

I guess this is what happened, you serializer that only understands OUT can not cope with a Session object …

… but of course I might be completely be mistaken due to incomplete information.

I hope this helps 😊

Feel free to get back to me for clarifications (on the mailing list)

Cheers

Thias




From: Daniel Vol <vo...@gmail.com>>
Sent: Mittwoch, 11. August 2021 14:47
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Odd Serialization exception

I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8)] INFO  o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics to Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and how should I solve it?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Odd Serialization exception

Posted by Daniel Vol <vo...@gmail.com>.
Hi Guowei,

I am running on EMR 5.32.0 with Flink 1.11.2

In meanwhile I did some tests and commented out part of the new code -

override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
    try {
//      val session = value.asInstanceOf[Session]
//      sessionDuration = 17L //session.getSessionDuration
//      sessionSize = 19 //session.getSessionTotalEvents
      super.invoke(value, context)
      sessionsWritten.inc()
    }

Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
null
So, my assumption is that something wrong with "override def open()" method

Thanks!

On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma <gu...@gmail.com> wrote:

> Hi, Daniel
> Could you tell me the version of Flink you use? I want to look at the
> corresponding code.
> Best,
> Guowei
>
>
> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vo...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> First, thanks for a fast reply.
>> I am new to Flink, so probably I miss a lot in terms of flow and objects
>> passed.
>>
>> The motivation is to get internal data from the transferred OUT Object to
>> send metrics. So I do downscale it but as per my perspective it is not
>> forwarded (super called with original value) or expected to be used in
>> later steps (this expected to be a local scope variable)
>> As I am suspect that you are right - can you point me to how can I get
>> internal data from OUT without changing it or affecting next steps.
>> As well - when I create the object - I specify OUT type (which is
>> Session):
>>
>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new KinesisEventSerializer[Session], producerConfig)
>>
>> "… but of course I might be completely be mistaken due to incomplete
>> information."
>> What kind of information can I supply?
>>
>> Thanks a lot!
>>
>> Daniel
>>
>> On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>
>> wrote:
>>
>> 
>>
>> Hi Daniel,
>>
>>
>>
>> On the first look there is one thing that catches my eye:
>>
>> In line ‘val session = value.asInstanceOf[Session]' it looks like you
>> are downcasting the event from OUT to Session.
>>
>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a
>> specific serializer[OUT] to transport events from one operator to the next
>> (or at least from one task to the next, if configured this way).
>>
>> These serializers usually only understand one type, OUT in your case.
>> Only in certain circumstances the java object (the event) is transported
>> directly from one operator to the next.
>>
>>
>>
>> I guess this is what happened, you serializer that only understands OUT
>> can not cope with a Session object …
>>
>>
>>
>> … but of course I might be completely be mistaken due to incomplete
>> information.
>>
>>
>>
>> I hope this helps 😊
>>
>>
>>
>> Feel free to get back to me for clarifications (on the mailing list)
>>
>>
>>
>> Cheers
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Daniel Vol <vo...@gmail.com>
>> *Sent:* Mittwoch, 11. August 2021 14:47
>> *To:* user@flink.apache.org
>> *Subject:* Odd Serialization exception
>>
>>
>>
>> I started to get the following exception:
>>
>>
>>
>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8)] INFO
>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>>     at org.apache.flink.streaming.runtime.io
>> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.SerializedThrowable: null
>>     at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>>     at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>>     at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>>     at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>>     at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>     at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>>     ... 23 common frames omitted
>>
>>
>>
>> The only change from version that has no this issue is adding some
>> metrics to Producer (marked in red):
>>
>> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)
>>
>>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>>
>>
>>
>>   @transient private var sessionsProcessed: Counter = _
>>
>>   @transient private var sessionsWritten: Counter = _
>>
>>   @transient private var sessionWriteFailed: Counter = _
>>
>>
>>
>>   @transient private var sessionDuration: Long = 0
>>
>>   @transient private var sessionSize: Int = 0
>>
>>
>>
>>     override def open(parameters: Configuration): Unit = {
>>
>>     val metrics = getRuntimeContext.getMetricGroup
>>
>>     sessionsProcessed = metrics.counter("sessionsProcessed")
>>
>>     sessionsWritten = metrics.counter("sessionsWritten")
>>
>>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>>
>>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))
>>
>>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))
>>
>>   }
>>
>>
>>
>>   @throws[Exception]
>>
>>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>>
>>     try {
>>
>>       val session = value.asInstanceOf[Session]
>>
>>       sessionDuration = session.getSessionDuration
>>
>>       sessionSize = session.getSessionTotalEvents
>>
>>       super.invoke(value, context)
>>
>>       sessionsWritten.inc()
>>
>>     }
>>
>>     catch {
>>
>>       case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>
>>
>>         logger.error("failed session ended = " + value, e)
>>
>>         sessionWriteFailed.inc()
>>
>>       case _ : Throwbale => sessionWriteFailed.inc()
>>
>>     } finally sessionsProcessed.inc()
>>
>>   }
>>
>> }
>>
>> Anyone is familiar and can point out what may cause this exception and how should I solve it?
>>
>> Thanks!
>>
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>>

Re: Odd Serialization exception

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Daniel
Could you tell me the version of Flink you use? I want to look at the
corresponding code.
Best,
Guowei


On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol <vo...@gmail.com> wrote:

> Hi Matthias,
>
> First, thanks for a fast reply.
> I am new to Flink, so probably I miss a lot in terms of flow and objects
> passed.
>
> The motivation is to get internal data from the transferred OUT Object to
> send metrics. So I do downscale it but as per my perspective it is not
> forwarded (super called with original value) or expected to be used in
> later steps (this expected to be a local scope variable)
> As I am suspect that you are right - can you point me to how can I get
> internal data from OUT without changing it or affecting next steps.
> As well - when I create the object - I specify OUT type (which is Session):
>
> val flinkKinesisProducer = new LogSessionProducer[*Session*](new KinesisEventSerializer[Session], producerConfig)
>
> "… but of course I might be completely be mistaken due to incomplete
> information."
> What kind of information can I supply?
>
> Thanks a lot!
>
> Daniel
>
> On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>
> wrote:
>
> 
>
> Hi Daniel,
>
>
>
> On the first look there is one thing that catches my eye:
>
> In line ‘val session = value.asInstanceOf[Session]' it looks like you are
> downcasting the event from OUT to Session.
>
> In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
> serializer[OUT] to transport events from one operator to the next (or at
> least from one task to the next, if configured this way).
>
> These serializers usually only understand one type, OUT in your case. Only
> in certain circumstances the java object (the event) is transported
> directly from one operator to the next.
>
>
>
> I guess this is what happened, you serializer that only understands OUT
> can not cope with a Session object …
>
>
>
> … but of course I might be completely be mistaken due to incomplete
> information.
>
>
>
> I hope this helps 😊
>
>
>
> Feel free to get back to me for clarifications (on the mailing list)
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol <vo...@gmail.com>
> *Sent:* Mittwoch, 11. August 2021 14:47
> *To:* user@flink.apache.org
> *Subject:* Odd Serialization exception
>
>
>
> I started to get the following exception:
>
>
>
> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8)] INFO
> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
>     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
>     at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
>     at org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: null
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
>     at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
>     at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
>     ... 23 common frames omitted
>
>
>
> The only change from version that has no this issue is adding some metrics
> to Producer (marked in red):
>
> class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)
>
>   extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {
>
>
>
>   @transient private var sessionsProcessed: Counter = _
>
>   @transient private var sessionsWritten: Counter = _
>
>   @transient private var sessionWriteFailed: Counter = _
>
>
>
>   @transient private var sessionDuration: Long = 0
>
>   @transient private var sessionSize: Int = 0
>
>
>
>     override def open(parameters: Configuration): Unit = {
>
>     val metrics = getRuntimeContext.getMetricGroup
>
>     sessionsProcessed = metrics.counter("sessionsProcessed")
>
>     sessionsWritten = metrics.counter("sessionsWritten")
>
>     sessionWriteFailed = metrics.counter("sessionWriteFailed")
>
>     metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))
>
>     metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))
>
>   }
>
>
>
>   @throws[Exception]
>
>   override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
>
>     try {
>
>       val session = value.asInstanceOf[Session]
>
>       sessionDuration = session.getSessionDuration
>
>       sessionSize = session.getSessionTotalEvents
>
>       super.invoke(value, context)
>
>       sessionsWritten.inc()
>
>     }
>
>     catch {
>
>       case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>
>
>         logger.error("failed session ended = " + value, e)
>
>         sessionWriteFailed.inc()
>
>       case _ : Throwbale => sessionWriteFailed.inc()
>
>     } finally sessionsProcessed.inc()
>
>   }
>
> }
>
> Anyone is familiar and can point out what may cause this exception and how should I solve it?
>
> Thanks!
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
>

Re: Odd Serialization exception

Posted by Daniel Vol <vo...@gmail.com>.
Hi Matthias,

First, thanks for a fast reply.
I am new to Flink, so probably I miss a lot in terms of flow and objects
passed.

The motivation is to get internal data from the transferred OUT Object to
send metrics. So I do downscale it but as per my perspective it is not
forwarded (super called with original value) or expected to be used in
later steps (this expected to be a local scope variable)
As I am suspect that you are right - can you point me to how can I get
internal data from OUT without changing it or affecting next steps.
As well - when I create the object - I specify OUT type (which is Session):

val flinkKinesisProducer = new LogSessionProducer[*Session*](new
KinesisEventSerializer[Session], producerConfig)

"… but of course I might be completely be mistaken due to incomplete
information."
What kind of information can I supply?

Thanks a lot!

Daniel

On 11 Aug 2021, at 17:28, Schwalbe Matthias <Ma...@viseca.ch>
wrote:



Hi Daniel,



On the first look there is one thing that catches my eye:

In line ‘val session = value.asInstanceOf[Session]' it looks like you are
downcasting the event from OUT to Session.

In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
serializer[OUT] to transport events from one operator to the next (or at
least from one task to the next, if configured this way).

These serializers usually only understand one type, OUT in your case. Only
in certain circumstances the java object (the event) is transported
directly from one operator to the next.



I guess this is what happened, you serializer that only understands OUT can
not cope with a Session object …



… but of course I might be completely be mistaken due to incomplete
information.



I hope this helps 😊



Feel free to get back to me for clarifications (on the mailing list)



Cheers



Thias









*From:* Daniel Vol <vo...@gmail.com>
*Sent:* Mittwoch, 11. August 2021 14:47
*To:* user@flink.apache.org
*Subject:* Odd Serialization exception



I started to get the following exception:



2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8)] INFO
o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 134 for operator Window(EventTimeSessionWindows(1800000),
EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
Unnamed (1/8). Failure reason: Checkpoint was declined.
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io
.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io
.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted



The only change from version that has no this issue is adding some metrics
to Producer (marked in red):

class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props:
Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration",
ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize",
ScalaGauge[Int](() => sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data
must be less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and
how should I solve it?

Thanks!

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for
the confidentiality and the intactness of this message. If you have
received it in error, please advise the sender by return e-mail and delete
this message and any attachments. Any unauthorised use or dissemination of
this information is strictly prohibited.

RE: Odd Serialization exception

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Daniel,

On the first look there is one thing that catches my eye:
In line ‘val session = value.asInstanceOf[Session]' it looks like you are downcasting the event from OUT to Session.
In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific serializer[OUT] to transport events from one operator to the next (or at least from one task to the next, if configured this way).
These serializers usually only understand one type, OUT in your case. Only in certain circumstances the java object (the event) is transported directly from one operator to the next.

I guess this is what happened, you serializer that only understands OUT can not cope with a Session object …

… but of course I might be completely be mistaken due to incomplete information.

I hope this helps 😊

Feel free to get back to me for clarifications (on the mailing list)

Cheers

Thias




From: Daniel Vol <vo...@gmail.com>
Sent: Mittwoch, 11. August 2021 14:47
To: user@flink.apache.org
Subject: Odd Serialization exception

I started to get the following exception:

2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8)] INFO  o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 134 for operator Window(EventTimeSessionWindows(1800000), EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink: Unnamed (1/8). Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: null
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.flushSync(FlinkKinesisProducer.java:411)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.snapshotState(FlinkKinesisProducer.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 common frames omitted

The only change from version that has no this issue is adding some metrics to Producer (marked in red):


class LogSessionProducer[OUT](schema: SerializationSchema[OUT], props: Properties)

  extends FlinkKinesisProducer[OUT](schema, props) with LazyLogging {



  @transient private var sessionsProcessed: Counter = _

  @transient private var sessionsWritten: Counter = _

  @transient private var sessionWriteFailed: Counter = _



  @transient private var sessionDuration: Long = 0

  @transient private var sessionSize: Int = 0



    override def open(parameters: Configuration): Unit = {

    val metrics = getRuntimeContext.getMetricGroup

    sessionsProcessed = metrics.counter("sessionsProcessed")

    sessionsWritten = metrics.counter("sessionsWritten")

    sessionWriteFailed = metrics.counter("sessionWriteFailed")

    metrics.gauge[Long, ScalaGauge[Long]]("sessionDuration", ScalaGauge[Long](() => sessionDuration))

    metrics.gauge[Int, ScalaGauge[Int]]("sessionSize", ScalaGauge[Int](() => sessionSize))

  }



  @throws[Exception]

  override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {

    try {

      val session = value.asInstanceOf[Session]

      sessionDuration = session.getSessionDuration

      sessionSize = session.getSessionTotalEvents

      super.invoke(value, context)

      sessionsWritten.inc()

    }

    catch {

      case e: IllegalArgumentException if e.getMessage.contains("Data must be less than or equal to") =>

        logger.error("failed session ended = " + value, e)

        sessionWriteFailed.inc()

      case _ : Throwbale => sessionWriteFailed.inc()

    } finally sessionsProcessed.inc()

  }

}

Anyone is familiar and can point out what may cause this exception and how should I solve it?

Thanks!
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.