You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2020/04/19 08:05:14 UTC

Serialization issues with case classes with Flink

Hi,

I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].

   - If I define the case class C in some cell I get this error:

java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.


   - That sounds related to this
   https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
   it looks like the zeppelin flink interpreter is wrapping the case class
   definition as an inner class. I tried defining the case class C inside an
   object Types that I define in another cell. With that I also get a
   serialization exception.

org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)

I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.

Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?

Thanks in advance,

Juan

Re: Serialization issues with case classes with Flink

Posted by Som Lima <so...@gmail.com>.
Looking at your original message
Your first error was
java.lang.IllegalArgumentException: requirement failed:

Not serialisation error.

As a golden rule of thumb always look at the first error , irrespective of
programming language.


On Mon, 20 Apr 2020, 03:13 Ravi Pullareddy, <ra...@minlog.com.au>
wrote:

> Hi Juan
>
>
>
> I see what your problem is. You have declared class C with field x of type
> Int. When you map to class fields you have to specify the type. Try the
> below code it works.
>
>
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))
>
> xs.count
>
>
>
> benv.execute("Count Collection")
>
>
>
> Cheers
>
> Ravi Pullareddy
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Monday, 20 April 2020 2:44 AM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Hi Jeff,
>
>
>
> I’ll try using POJO clases instead.
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com>
> escribió:
>
> Hi Juan,
>
>
>
> This is an issue of flink, I have created ticket in flink community,
> https://issues.apache.org/jira/browse/FLINK-16969
>
> The workaround is to use POJO class instead of case class.
>
>
>
> Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日 下午
> 7:58写道:
>
> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
>
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi Ravi,
>
>
>
> I posted another message with the minimal reproduction, I repeat it here:
>
>
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
>
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
>
>
> ```
>
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
>
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
>
> ```
>
>
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
>
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
>
>
> Thanks for your answer,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullareddy@minlog.com.au> wrote:
>
> Hi Juan
>
>
>
> I have written various applications for continuous processing of csv
> files. Please post your entire code and how you are mapping. It becomes
> easy to highlight the issue.
>
>
>
> Thanks
>
> Ravi Pullareddy
>
>
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Sunday, 19 April 2020 6:25 PM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Just for the record, the Spark version of that works fine:
>
>
>
> ```
> %spark
>
> case class C2(x: Int)
>
> val xs = sc.parallelize(1 to 10)
> val csSpark = xs.map{C2(_)}
>
> csSpark.collect
>
>
>
> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
> C2(8), C2(9), C2(10))
>
> ```
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Minimal reproduction:
>
>    - Fist option
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>    - Second option
>
> ```scala
>
> object Types {
>     case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
>
> ```
>
>
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>
>
> Greetings,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
>
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>
>    - That sounds related to this
>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
>
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
>
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
>
>
> Thanks in advance,
>
>
>
> Juan
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>

RE: Serialization issues with case classes with Flink

Posted by Ravi Pullareddy <ra...@minlog.com.au>.
Hi Juan



I see what your problem is. You have declared class C with field x of type
Int. When you map to class fields you have to specify the type. Try the
below code it works.



case class C(x: Int)

val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))

xs.count



benv.execute("Count Collection")



Cheers

Ravi Pullareddy



*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Monday, 20 April 2020 2:44 AM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink



Hi Jeff,



I’ll try using POJO clases instead.



Thanks,



Juan



El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com> escribió:

Hi Juan,



This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969

The workaround is to use POJO class instead of case class.



Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日 下午
7:58写道:

Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink



On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

Hi Ravi,



I posted another message with the minimal reproduction, I repeat it here:



```scala

case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count

```



defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided



As you can see, this minimal example doesn't use CSV files, so I don't
think the CSV connector it the problem.



I don't think this is a Flink issue either, as that example works fine in
the Flink shell:



```

root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local

scala> val xs = benv.fromCollection(1 to 10)
xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@6012bee8

scala> xs.collect
res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val cs = xs.map{C(_)}
cs: org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@36f807f5

scala> cs.collect
res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9),
C(10))

scala>

```



Note I was running Zeppelin in a docker container in my laptop, also using
Flink's local mode



I think this is a problem with the Zeppelin integratin with Flink, and how
it processes case class definitions in a cell.



Thanks for your answer,



Juan





On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
ravi.pullareddy@minlog.com.au> wrote:

Hi Juan



I have written various applications for continuous processing of csv files.
Please post your entire code and how you are mapping. It becomes easy to
highlight the issue.



Thanks

Ravi Pullareddy





*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Sunday, 19 April 2020 6:25 PM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink



Just for the record, the Spark version of that works fine:



```
%spark

case class C2(x: Int)

val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}

csSpark.collect



res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))

```



Thanks,



Juan



On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

Minimal reproduction:

   - Fist option

```scala

case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count

```



defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided

   - Second option

```scala

object Types {
    case class C(x: Int)
}

val cs2 = xs.map{Types.C(_)}

cs2.count

```



defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)



Greetings,



Juan





On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

Hi,



I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].

   - If I define the case class C in some cell I get this error:

java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.



   - That sounds related to this
   https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
   it looks like the zeppelin flink interpreter is wrapping the case class
   definition as an inner class. I tried defining the case class C inside an
   object Types that I define in another cell. With that I also get a
   serialization exception.

org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)



I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.



Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?



Thanks in advance,



Juan




-- 

Best Regards

Jeff Zhang

Re: Serialization issues with case classes with Flink

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Jeff,

I’ll try using POJO clases instead.

Thanks,

Juan

El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com> escribió:

> Hi Juan,
>
> This is an issue of flink, I have created ticket in flink community,
> https://issues.apache.org/jira/browse/FLINK-16969
> The workaround is to use POJO class instead of case class.
>
> Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日
> 下午7:58写道:
>
>> Thanks also to Som and Zahid for your answers. But as I show in my
>> previous answer, this issue happens without using the CSV source, and it
>> doesn't show in the Flink Scala shell, so it looks like an issue with
>> Zeppelin interpreter for Flink
>>
>> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> I posted another message with the minimal reproduction, I repeat it here:
>>>
>>> ```scala
>>> case class C(x: Int)
>>>
>>> val xs = benv.fromCollection(1 to 10)
>>> val cs = xs.map{C(_)}
>>>
>>> cs.count
>>> ```
>>>
>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>> org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@205a35a
>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>> instance class, meaning it is not a member of a toplevel object, or of an
>>> object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object. at
>>> scala.Predef$.require(Predef.scala:224) at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>> at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>> ... 125 elided
>>>
>>> As you can see, this minimal example doesn't use CSV files, so I don't
>>> think the CSV connector it the problem.
>>>
>>> I don't think this is a Flink issue either, as that example works fine
>>> in the Flink shell:
>>>
>>> ```
>>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
>>> scala> val xs = benv.fromCollection(1 to 10)
>>> xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@6012bee8
>>>
>>> scala> xs.collect
>>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>
>>> scala> val cs = xs.map{C(_)}
>>> cs: org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@36f807f5
>>>
>>> scala> cs.collect
>>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
>>> C(9), C(10))
>>>
>>> scala>
>>> ```
>>>
>>> Note I was running Zeppelin in a docker container in my laptop, also
>>> using Flink's local mode
>>>
>>> I think this is a problem with the Zeppelin integratin with Flink, and
>>> how it processes case class definitions in a cell.
>>>
>>> Thanks for your answer,
>>>
>>> Juan
>>>
>>>
>>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
>>> ravi.pullareddy@minlog.com.au> wrote:
>>>
>>>> Hi Juan
>>>>
>>>>
>>>>
>>>> I have written various applications for continuous processing of csv
>>>> files. Please post your entire code and how you are mapping. It becomes
>>>> easy to highlight the issue.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Ravi Pullareddy
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>>>> *Sent:* Sunday, 19 April 2020 6:25 PM
>>>> *To:* users@zeppelin.apache.org
>>>> *Subject:* Re: Serialization issues with case classes with Flink
>>>>
>>>>
>>>>
>>>> Just for the record, the Spark version of that works fine:
>>>>
>>>>
>>>>
>>>> ```
>>>> %spark
>>>>
>>>> case class C2(x: Int)
>>>>
>>>> val xs = sc.parallelize(1 to 10)
>>>> val csSpark = xs.map{C2(_)}
>>>>
>>>> csSpark.collect
>>>>
>>>>
>>>>
>>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6),
>>>> C2(7), C2(8), C2(9), C2(10))
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>>>>
>>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>>
>>>> Minimal reproduction:
>>>>
>>>>    - Fist option
>>>>
>>>> ```scala
>>>>
>>>> case class C(x: Int)
>>>>
>>>> val xs = benv.fromCollection(1 to 10)
>>>> val cs = xs.map{C(_)}
>>>>
>>>> cs.count
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>>> org.apache.flink.api.scala.DataSet[C] =
>>>> org.apache.flink.api.scala.DataSet@205a35a
>>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>>> instance class, meaning it is not a member of a toplevel object, or of an
>>>> object contained in a toplevel object, therefore it requires an outer
>>>> instance to be instantiated, but we don't have a reference to the outer
>>>> instance. Please consider changing the outer class to an object. at
>>>> scala.Predef$.require(Predef.scala:224) at
>>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>>> ... 125 elided
>>>>
>>>>    - Second option
>>>>
>>>> ```scala
>>>>
>>>> object Types {
>>>>     case class C(x: Int)
>>>> }
>>>>
>>>> val cs2 = xs.map{Types.C(_)}
>>>>
>>>> cs2.count
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> defined object Types
>>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.flink.api.scala.DataSet at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>
>>>>
>>>>
>>>> Greetings,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I'm using the Flink interpreter and the benv environment. I'm reading
>>>> some csv files using benv.readCsvFile and it works ok. I have also defined
>>>> a case class C for the csv records. The problem happens when I apply a
>>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>>>> convert it into a DataSet[C].
>>>>
>>>>    - If I define the case class C in some cell I get this error:
>>>>
>>>> java.lang.IllegalArgumentException: requirement failed: The class C is
>>>> an instance class, meaning it is not a member of a toplevel object, or of
>>>> an object contained in a toplevel object, therefore it requires an outer
>>>> instance to be instantiated, but we don't have a reference to the outer
>>>> instance. Please consider changing the outer class to an object.
>>>>
>>>>
>>>>
>>>>    - That sounds related to this
>>>>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>>>    definition as an inner class. I tried defining the case class C inside an
>>>>    object Types that I define in another cell. With that I also get a
>>>>    serialization exception.
>>>>
>>>> org.apache.flink.api.common.InvalidProgramException: Task not
>>>> serializable
>>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>>>
>>>>
>>>>
>>>> I guess that didn't work because the object Types is still defined
>>>> inside some class implicitly defined by the interpreter.
>>>>
>>>>
>>>>
>>>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>>>> refer to the code in the cells, is there some convention I can use to
>>>> understand to which line in the notebook those error messages are referring
>>>> to?
>>>>
>>>>
>>>>
>>>> Thanks in advance,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Serialization issues with case classes with Flink

Posted by Jeff Zhang <zj...@gmail.com>.
Hi Juan,

This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969
The workaround is to use POJO class instead of case class.

Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日
下午7:58写道:

> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi Ravi,
>>
>> I posted another message with the minimal reproduction, I repeat it here:
>>
>> ```scala
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>> ```
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>> As you can see, this minimal example doesn't use CSV files, so I don't
>> think the CSV connector it the problem.
>>
>> I don't think this is a Flink issue either, as that example works fine in
>> the Flink shell:
>>
>> ```
>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
>> scala> val xs = benv.fromCollection(1 to 10)
>> xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@6012bee8
>>
>> scala> xs.collect
>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>
>> scala> val cs = xs.map{C(_)}
>> cs: org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@36f807f5
>>
>> scala> cs.collect
>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
>> C(9), C(10))
>>
>> scala>
>> ```
>>
>> Note I was running Zeppelin in a docker container in my laptop, also
>> using Flink's local mode
>>
>> I think this is a problem with the Zeppelin integratin with Flink, and
>> how it processes case class definitions in a cell.
>>
>> Thanks for your answer,
>>
>> Juan
>>
>>
>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
>> ravi.pullareddy@minlog.com.au> wrote:
>>
>>> Hi Juan
>>>
>>>
>>>
>>> I have written various applications for continuous processing of csv
>>> files. Please post your entire code and how you are mapping. It becomes
>>> easy to highlight the issue.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ravi Pullareddy
>>>
>>>
>>>
>>>
>>>
>>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>>> *Sent:* Sunday, 19 April 2020 6:25 PM
>>> *To:* users@zeppelin.apache.org
>>> *Subject:* Re: Serialization issues with case classes with Flink
>>>
>>>
>>>
>>> Just for the record, the Spark version of that works fine:
>>>
>>>
>>>
>>> ```
>>> %spark
>>>
>>> case class C2(x: Int)
>>>
>>> val xs = sc.parallelize(1 to 10)
>>> val csSpark = xs.map{C2(_)}
>>>
>>> csSpark.collect
>>>
>>>
>>>
>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6),
>>> C2(7), C2(8), C2(9), C2(10))
>>>
>>> ```
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Minimal reproduction:
>>>
>>>    - Fist option
>>>
>>> ```scala
>>>
>>> case class C(x: Int)
>>>
>>> val xs = benv.fromCollection(1 to 10)
>>> val cs = xs.map{C(_)}
>>>
>>> cs.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>> org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@205a35a
>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>> instance class, meaning it is not a member of a toplevel object, or of an
>>> object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object. at
>>> scala.Predef$.require(Predef.scala:224) at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>> at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>> ... 125 elided
>>>
>>>    - Second option
>>>
>>> ```scala
>>>
>>> object Types {
>>>     case class C(x: Int)
>>> }
>>>
>>> val cs2 = xs.map{Types.C(_)}
>>>
>>> cs2.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined object Types
>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.api.scala.DataSet at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>
>>>
>>>
>>> Greetings,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I'm using the Flink interpreter and the benv environment. I'm reading
>>> some csv files using benv.readCsvFile and it works ok. I have also defined
>>> a case class C for the csv records. The problem happens when I apply a
>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>>> convert it into a DataSet[C].
>>>
>>>    - If I define the case class C in some cell I get this error:
>>>
>>> java.lang.IllegalArgumentException: requirement failed: The class C is
>>> an instance class, meaning it is not a member of a toplevel object, or of
>>> an object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object.
>>>
>>>
>>>
>>>    - That sounds related to this
>>>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>>    definition as an inner class. I tried defining the case class C inside an
>>>    object Types that I define in another cell. With that I also get a
>>>    serialization exception.
>>>
>>> org.apache.flink.api.common.InvalidProgramException: Task not
>>> serializable
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>>
>>>
>>>
>>> I guess that didn't work because the object Types is still defined
>>> inside some class implicitly defined by the interpreter.
>>>
>>>
>>>
>>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>>> refer to the code in the cells, is there some convention I can use to
>>> understand to which line in the notebook those error messages are referring
>>> to?
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>>
>>>
>>> Juan
>>>
>>>

-- 
Best Regards

Jeff Zhang

Re: Serialization issues with case classes with Flink

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink

On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi Ravi,
>
> I posted another message with the minimal reproduction, I repeat it here:
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
> ```
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
> ```
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
> Thanks for your answer,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullareddy@minlog.com.au> wrote:
>
>> Hi Juan
>>
>>
>>
>> I have written various applications for continuous processing of csv
>> files. Please post your entire code and how you are mapping. It becomes
>> easy to highlight the issue.
>>
>>
>>
>> Thanks
>>
>> Ravi Pullareddy
>>
>>
>>
>>
>>
>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>> *Sent:* Sunday, 19 April 2020 6:25 PM
>> *To:* users@zeppelin.apache.org
>> *Subject:* Re: Serialization issues with case classes with Flink
>>
>>
>>
>> Just for the record, the Spark version of that works fine:
>>
>>
>>
>> ```
>> %spark
>>
>> case class C2(x: Int)
>>
>> val xs = sc.parallelize(1 to 10)
>> val csSpark = xs.map{C2(_)}
>>
>> csSpark.collect
>>
>>
>>
>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
>> C2(8), C2(9), C2(10))
>>
>> ```
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Juan
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Minimal reproduction:
>>
>>    - Fist option
>>
>> ```scala
>>
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>>
>> ```
>>
>>
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>>    - Second option
>>
>> ```scala
>>
>> object Types {
>>     case class C(x: Int)
>> }
>>
>> val cs2 = xs.map{Types.C(_)}
>>
>> cs2.count
>>
>> ```
>>
>>
>>
>> defined object Types org.apache.flink.api.common.InvalidProgramException:
>> Task not serializable at
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>> Caused by: java.io.NotSerializableException:
>> org.apache.flink.api.scala.DataSet at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>
>>
>>
>> Greetings,
>>
>>
>>
>> Juan
>>
>>
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>>    - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>>
>>    - That sounds related to this
>>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>    definition as an inner class. I tried defining the case class C inside an
>>    object Types that I define in another cell. With that I also get a
>>    serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>>
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>>
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>>
>>
>> Thanks in advance,
>>
>>
>>
>> Juan
>>
>>

Re: Serialization issues with case classes with Flink

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Ravi,

I posted another message with the minimal reproduction, I repeat it here:

```scala
case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count
```

defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided

As you can see, this minimal example doesn't use CSV files, so I don't
think the CSV connector it the problem.

I don't think this is a Flink issue either, as that example works fine in
the Flink shell:

```
root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
scala> val xs = benv.fromCollection(1 to 10)
xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@6012bee8

scala> xs.collect
res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val cs = xs.map{C(_)}
cs: org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@36f807f5

scala> cs.collect
res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9),
C(10))

scala>
```

Note I was running Zeppelin in a docker container in my laptop, also using
Flink's local mode

I think this is a problem with the Zeppelin integratin with Flink, and how
it processes case class definitions in a cell.

Thanks for your answer,

Juan


On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
ravi.pullareddy@minlog.com.au> wrote:

> Hi Juan
>
>
>
> I have written various applications for continuous processing of csv
> files. Please post your entire code and how you are mapping. It becomes
> easy to highlight the issue.
>
>
>
> Thanks
>
> Ravi Pullareddy
>
>
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Sunday, 19 April 2020 6:25 PM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Just for the record, the Spark version of that works fine:
>
>
>
> ```
> %spark
>
> case class C2(x: Int)
>
> val xs = sc.parallelize(1 to 10)
> val csSpark = xs.map{C2(_)}
>
> csSpark.collect
>
>
>
> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
> C2(8), C2(9), C2(10))
>
> ```
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Minimal reproduction:
>
>    - Fist option
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>    - Second option
>
> ```scala
>
> object Types {
>     case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
>
> ```
>
>
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>
>
> Greetings,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
>
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>
>    - That sounds related to this
>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
>
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
>
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
>
>
> Thanks in advance,
>
>
>
> Juan
>
>

RE: Serialization issues with case classes with Flink

Posted by Ravi Pullareddy <ra...@minlog.com.au>.
Hi Juan



I have written various applications for continuous processing of csv files.
Please post your entire code and how you are mapping. It becomes easy to
highlight the issue.



Thanks

Ravi Pullareddy





*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Sunday, 19 April 2020 6:25 PM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink



Just for the record, the Spark version of that works fine:



```
%spark

case class C2(x: Int)

val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}

csSpark.collect



res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))

```



Thanks,



Juan



On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

Minimal reproduction:

   - Fist option

```scala

case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count

```



defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided

   - Second option

```scala

object Types {
    case class C(x: Int)
}

val cs2 = xs.map{Types.C(_)}

cs2.count

```



defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)



Greetings,



Juan





On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

Hi,



I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].

   - If I define the case class C in some cell I get this error:

java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.



   - That sounds related to this
   https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
   it looks like the zeppelin flink interpreter is wrapping the case class
   definition as an inner class. I tried defining the case class C inside an
   object Types that I define in another cell. With that I also get a
   serialization exception.

org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)



I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.



Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?



Thanks in advance,



Juan

Re: Serialization issues with case classes with Flink

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Just for the record, the Spark version of that works fine:

```
%spark

case class C2(x: Int)

val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}

csSpark.collect

res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))
```

Thanks,

Juan

On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Minimal reproduction:
>
>    - Fist option
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>    - Second option
>
> ```scala
> object Types {
>     case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
> ```
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> Greetings,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>>    - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>>    - That sounds related to this
>>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>    definition as an inner class. I tried defining the case class C inside an
>>    object Types that I define in another cell. With that I also get a
>>    serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>> Thanks in advance,
>>
>> Juan
>>
>

Re: Serialization issues with case classes with Flink

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Minimal reproduction:

   - Fist option

```scala
case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count
```

defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided

   - Second option

```scala
object Types {
    case class C(x: Int)
}

val cs2 = xs.map{Types.C(_)}

cs2.count
```

defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

Greetings,

Juan


On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>    - That sounds related to this
>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>

Re: Serialization issues with case classes with Flink

Posted by Zahid Rahman <za...@gmail.com>.
You may also want to look at this
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html


On Sun, 19 Apr 2020, 09:05 Juan Rodríguez Hortalá, <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>    - That sounds related to this
>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>

Re: Serialization issues with case classes with Flink

Posted by Som Lima <so...@gmail.com>.
You may wish to have a look at Apache avro project.

It shows you how to Generare source code  for  schema classes for creating
objects to be used in reading csv files etc. for the purpose of
serialisation and deserialization.

https://avro.apache.org/docs/current/gettingstartedjava.html

The avro project is mentioned in the flink documents.



On Sun, 19 Apr 2020, 09:05 Juan Rodríguez Hortalá, <
juan.rodriguez.hortala@gmail.com> wrote:

> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>    - That sounds related to this
>    https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>