You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Colin Williams <co...@gmail.com> on 2017/10/07 21:34:55 UTC

serialization error when using multiple metrics counters

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.
getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.
getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.
getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have
no issues with using a single counter. However with multiple counters I get
a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction,
or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a
Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...

Re: serialization error when using multiple metrics counters

Posted by Colin Williams <co...@gmail.com>.
Thanks everyone, and thank you very much Seth! Adding @transient to the
lazy vals is what I needed.

On Mon, Oct 9, 2017 at 1:34 PM, Seth Wiesman <sw...@mediamath.com> wrote:

> A scala class contains a single lazy val it is implemented using a boolean
> flag to track if the field has been evaluated. When a class contains,
> multiple lazy val’s it is implemented as a bit mask shared amongst the
> variables. This can lead to inconsistencies as to whether serialization
> forces evaluation of the field, in general lazy val’s should always be
> marked @transient for expected behavior.
>
>
>
> Seth
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Date: *Monday, October 9, 2017 at 2:44 PM
> *To: *Kostas Kloudas <k....@data-artisans.com>
> *Cc: *Colin Williams <co...@gmail.com>, user <
> user@flink.apache.org>
> *Subject: *Re: serialization error when using multiple metrics counters
>
>
>
> Interesting, is there a quirk in Scala that using multiple lazy variables
> results possibly in eager initialization of some?
>
>
>
> On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <
> k.kloudas@data-artisans.com> wrote:
>
> Hi Colin,
>
>
>
> Are you initializing your counters from within the open() method of you
> rich function?
>
> In other words, are you calling
>
>
>
> counter = getRuntimeContext.getMetricGroup.counter(“my counter”)
>
>
>
> from within the open().
>
>
>
> The counter interface is not serializable. So if you instantiate the
> counters outside the open(),
>
> when Flink tries to ship your code to the cluster, it cannot so you get
> the exception.
>
>
>
> You can have a look at the docs for an example:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/
> metrics.html
>
>
>
> Thanks,
>
> Kostas
>
>
>
> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>
>
>
> I've created a RichMapFunction in scala with multiple counters like:
>
>
>
>    lazy val successCounter = getRuntimeContext.getMetricGroup.counter("
> successfulParse")
>
>    lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("
> failedParse")
>
>    lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("
> errorParse")
>
>
>
> which I increment in the map function. While testing I noticed that I have
> no issues with using a single counter. However with multiple counters I get
> a serialization error using more than one counter.
>
>
>
> Does anyone know how I can use multiple counters from my RichMapFunction,
> or what I'm doing wrong?
>
>
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
> DataStream.java:527)
>
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(
> DataStream.scala:581)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:27)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:23)
>
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> [info]   ...
>
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> [info]   at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
> [info]   at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   ...
>
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for
> a Error -> ParseResult[LineProtocol] *** FAILED ***
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
> DataStream.java:527)
>
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(
> DataStream.scala:581)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
> ParsedResultUnwrapperTest.scala:37)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
> ParsedResultUnwrapperTest.scala:32)
>
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> [info]   ...
>
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> [info]   at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
> [info]   at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   ...
>
>
>
>
>

Re: serialization error when using multiple metrics counters

Posted by Seth Wiesman <sw...@mediamath.com>.
A scala class contains a single lazy val it is implemented using a boolean flag to track if the field has been evaluated. When a class contains, multiple lazy val’s it is implemented as a bit mask shared amongst the variables. This can lead to inconsistencies as to whether serialization forces evaluation of the field, in general lazy val’s should always be marked @transient for expected behavior.

Seth

From: Stephan Ewen <se...@apache.org>
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas <k....@data-artisans.com>
Cc: Colin Williams <co...@gmail.com>, user <us...@flink.apache.org>
Subject: Re: serialization error when using multiple metrics counters

Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k....@data-artisans.com>> wrote:
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling

counter = getRuntimeContext.getMetricGroup.counter(“my counter”)

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams <co...@gmail.com>> wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?

[info]   org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...



Re: serialization error when using multiple metrics counters

Posted by Stephan Ewen <se...@apache.org>.
Interesting, is there a quirk in Scala that using multiple lazy variables
results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Colin,
>
> Are you initializing your counters from within the open() method of you
> rich function?
> In other words, are you calling
>
> counter = getRuntimeContext.getMetricGroup.counter(“my counter”)
>
> from within the open().
>
> The counter interface is not serializable. So if you instantiate the
> counters outside the open(),
> when Flink tries to ship your code to the cluster, it cannot so you get
> the exception.
>
> You can have a look at the docs for an example:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> monitoring/metrics.html
>
> Thanks,
> Kostas
>
> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>
> I've created a RichMapFunction in scala with multiple counters like:
>
>    lazy val successCounter = getRuntimeContext.getMetricGro
> up.counter("successfulParse")
>    lazy val failedCounter = getRuntimeContext.getMetricGro
> up.counter("failedParse")
>    lazy val errorCounter = getRuntimeContext.getMetricGro
> up.counter("errorParse")
>
> which I increment in the map function. While testing I noticed that I have
> no issues with using a single counter. However with multiple counters I get
> a serialization error using more than one counter.
>
> Does anyone know how I can use multiple counters from my RichMapFunction,
> or what I'm doing wrong?
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(Dat
> aStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStre
> am.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra
> pperTest.scala:27)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwra
> pperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
> ream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
> m.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
> tream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja
> va:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(Inst
> antiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for
> a Error -> ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(Dat
> aStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStre
> am.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra
> pperTest.scala:37)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwra
> pperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
> ream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
> m.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
> tream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
> ava:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja
> va:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(Inst
> antiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleane
> r.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(D
> ataStream.java:183)
> [info]   ...
>
>
>

Re: serialization error when using multiple metrics counters

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Colin,

Are you initializing your counters from within the open() method of you rich function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

from within the open().

The counter interface is not serializable. So if you instantiate the counters outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>

Thanks,
Kostas

> On Oct 7, 2017, at 11:34 PM, Colin Williams <co...@gmail.com> wrote:
> 
> I've created a RichMapFunction in scala with multiple counters like:
> 
>    lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse")
>    lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse")
>    lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse")
> 
> which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter.
> 
> Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong?
> 
> [info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields.
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...