You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2020/04/19 08:05:14 UTC
Serialization issues with case classes with Flink
Hi,
I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].
- If I define the case class C in some cell I get this error:
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.
- That sounds related to this
https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
it looks like the zeppelin flink interpreter is wrapping the case class
definition as an inner class. I tried defining the case class C inside an
object Types that I define in another cell. With that I also get a
serialization exception.
org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.
Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?
Thanks in advance,
Juan
Re: Serialization issues with case classes with Flink
Posted by Som Lima <so...@gmail.com>.
Looking at your original message
Your first error was
java.lang.IllegalArgumentException: requirement failed:
Not serialisation error.
As a golden rule of thumb always look at the first error , irrespective of
programming language.
On Mon, 20 Apr 2020, 03:13 Ravi Pullareddy, <ra...@minlog.com.au>
wrote:
> Hi Juan
>
>
>
> I see what your problem is. You have declared class C with field x of type
> Int. When you map to class fields you have to specify the type. Try the
> below code it works.
>
>
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))
>
> xs.count
>
>
>
> benv.execute("Count Collection")
>
>
>
> Cheers
>
> Ravi Pullareddy
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Monday, 20 April 2020 2:44 AM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Hi Jeff,
>
>
>
> I’ll try using POJO clases instead.
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com>
> escribió:
>
> Hi Juan,
>
>
>
> This is an issue of flink, I have created ticket in flink community,
> https://issues.apache.org/jira/browse/FLINK-16969
>
> The workaround is to use POJO class instead of case class.
>
>
>
> Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日 下午
> 7:58写道:
>
> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
>
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi Ravi,
>
>
>
> I posted another message with the minimal reproduction, I repeat it here:
>
>
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
>
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
>
>
> ```
>
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
>
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
>
> ```
>
>
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
>
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
>
>
> Thanks for your answer,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullareddy@minlog.com.au> wrote:
>
> Hi Juan
>
>
>
> I have written various applications for continuous processing of csv
> files. Please post your entire code and how you are mapping. It becomes
> easy to highlight the issue.
>
>
>
> Thanks
>
> Ravi Pullareddy
>
>
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Sunday, 19 April 2020 6:25 PM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Just for the record, the Spark version of that works fine:
>
>
>
> ```
> %spark
>
> case class C2(x: Int)
>
> val xs = sc.parallelize(1 to 10)
> val csSpark = xs.map{C2(_)}
>
> csSpark.collect
>
>
>
> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
> C2(8), C2(9), C2(10))
>
> ```
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Minimal reproduction:
>
> - Fist option
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> - Second option
>
> ```scala
>
> object Types {
> case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
>
> ```
>
>
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>
>
> Greetings,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
>
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
> - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>
> - That sounds related to this
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
> it looks like the zeppelin flink interpreter is wrapping the case class
> definition as an inner class. I tried defining the case class C inside an
> object Types that I define in another cell. With that I also get a
> serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
>
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
>
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
>
>
> Thanks in advance,
>
>
>
> Juan
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>
RE: Serialization issues with case classes with Flink
Posted by Ravi Pullareddy <ra...@minlog.com.au>.
Hi Juan
I see what your problem is. You have declared class C with field x of type
Int. When you map to class fields you have to specify the type. Try the
below code it works.
case class C(x: Int)
val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))
xs.count
benv.execute("Count Collection")
Cheers
Ravi Pullareddy
*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Monday, 20 April 2020 2:44 AM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink
Hi Jeff,
I’ll try using POJO clases instead.
Thanks,
Juan
El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com> escribió:
Hi Juan,
This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969
The workaround is to use POJO class instead of case class.
Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日 下午
7:58写道:
Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink
On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
Hi Ravi,
I posted another message with the minimal reproduction, I repeat it here:
```scala
case class C(x: Int)
val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}
cs.count
```
defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided
As you can see, this minimal example doesn't use CSV files, so I don't
think the CSV connector it the problem.
I don't think this is a Flink issue either, as that example works fine in
the Flink shell:
```
root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
scala> val xs = benv.fromCollection(1 to 10)
xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@6012bee8
scala> xs.collect
res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val cs = xs.map{C(_)}
cs: org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@36f807f5
scala> cs.collect
res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9),
C(10))
scala>
```
Note I was running Zeppelin in a docker container in my laptop, also using
Flink's local mode
I think this is a problem with the Zeppelin integratin with Flink, and how
it processes case class definitions in a cell.
Thanks for your answer,
Juan
On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
ravi.pullareddy@minlog.com.au> wrote:
Hi Juan
I have written various applications for continuous processing of csv files.
Please post your entire code and how you are mapping. It becomes easy to
highlight the issue.
Thanks
Ravi Pullareddy
*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Sunday, 19 April 2020 6:25 PM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink
Just for the record, the Spark version of that works fine:
```
%spark
case class C2(x: Int)
val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}
csSpark.collect
res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))
```
Thanks,
Juan
On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
Minimal reproduction:
- Fist option
```scala
case class C(x: Int)
val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}
cs.count
```
defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided
- Second option
```scala
object Types {
case class C(x: Int)
}
val cs2 = xs.map{Types.C(_)}
cs2.count
```
defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
Greetings,
Juan
On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
Hi,
I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].
- If I define the case class C in some cell I get this error:
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.
- That sounds related to this
https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
it looks like the zeppelin flink interpreter is wrapping the case class
definition as an inner class. I tried defining the case class C inside an
object Types that I define in another cell. With that I also get a
serialization exception.
org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.
Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?
Thanks in advance,
Juan
--
Best Regards
Jeff Zhang
Re: Serialization issues with case classes with Flink
Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Jeff,
I’ll try using POJO clases instead.
Thanks,
Juan
El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zj...@gmail.com> escribió:
> Hi Juan,
>
> This is an issue of flink, I have created ticket in flink community,
> https://issues.apache.org/jira/browse/FLINK-16969
> The workaround is to use POJO class instead of case class.
>
> Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日
> 下午7:58写道:
>
>> Thanks also to Som and Zahid for your answers. But as I show in my
>> previous answer, this issue happens without using the CSV source, and it
>> doesn't show in the Flink Scala shell, so it looks like an issue with
>> Zeppelin interpreter for Flink
>>
>> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> I posted another message with the minimal reproduction, I repeat it here:
>>>
>>> ```scala
>>> case class C(x: Int)
>>>
>>> val xs = benv.fromCollection(1 to 10)
>>> val cs = xs.map{C(_)}
>>>
>>> cs.count
>>> ```
>>>
>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>> org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@205a35a
>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>> instance class, meaning it is not a member of a toplevel object, or of an
>>> object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object. at
>>> scala.Predef$.require(Predef.scala:224) at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>> at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>> ... 125 elided
>>>
>>> As you can see, this minimal example doesn't use CSV files, so I don't
>>> think the CSV connector it the problem.
>>>
>>> I don't think this is a Flink issue either, as that example works fine
>>> in the Flink shell:
>>>
>>> ```
>>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
>>> scala> val xs = benv.fromCollection(1 to 10)
>>> xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@6012bee8
>>>
>>> scala> xs.collect
>>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>
>>> scala> val cs = xs.map{C(_)}
>>> cs: org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@36f807f5
>>>
>>> scala> cs.collect
>>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
>>> C(9), C(10))
>>>
>>> scala>
>>> ```
>>>
>>> Note I was running Zeppelin in a docker container in my laptop, also
>>> using Flink's local mode
>>>
>>> I think this is a problem with the Zeppelin integratin with Flink, and
>>> how it processes case class definitions in a cell.
>>>
>>> Thanks for your answer,
>>>
>>> Juan
>>>
>>>
>>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
>>> ravi.pullareddy@minlog.com.au> wrote:
>>>
>>>> Hi Juan
>>>>
>>>>
>>>>
>>>> I have written various applications for continuous processing of csv
>>>> files. Please post your entire code and how you are mapping. It becomes
>>>> easy to highlight the issue.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Ravi Pullareddy
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>>>> *Sent:* Sunday, 19 April 2020 6:25 PM
>>>> *To:* users@zeppelin.apache.org
>>>> *Subject:* Re: Serialization issues with case classes with Flink
>>>>
>>>>
>>>>
>>>> Just for the record, the Spark version of that works fine:
>>>>
>>>>
>>>>
>>>> ```
>>>> %spark
>>>>
>>>> case class C2(x: Int)
>>>>
>>>> val xs = sc.parallelize(1 to 10)
>>>> val csSpark = xs.map{C2(_)}
>>>>
>>>> csSpark.collect
>>>>
>>>>
>>>>
>>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6),
>>>> C2(7), C2(8), C2(9), C2(10))
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>>>>
>>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>>
>>>> Minimal reproduction:
>>>>
>>>> - Fist option
>>>>
>>>> ```scala
>>>>
>>>> case class C(x: Int)
>>>>
>>>> val xs = benv.fromCollection(1 to 10)
>>>> val cs = xs.map{C(_)}
>>>>
>>>> cs.count
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>>> org.apache.flink.api.scala.DataSet[C] =
>>>> org.apache.flink.api.scala.DataSet@205a35a
>>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>>> instance class, meaning it is not a member of a toplevel object, or of an
>>>> object contained in a toplevel object, therefore it requires an outer
>>>> instance to be instantiated, but we don't have a reference to the outer
>>>> instance. Please consider changing the outer class to an object. at
>>>> scala.Predef$.require(Predef.scala:224) at
>>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>>> at
>>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>>> ... 125 elided
>>>>
>>>> - Second option
>>>>
>>>> ```scala
>>>>
>>>> object Types {
>>>> case class C(x: Int)
>>>> }
>>>>
>>>> val cs2 = xs.map{Types.C(_)}
>>>>
>>>> cs2.count
>>>>
>>>> ```
>>>>
>>>>
>>>>
>>>> defined object Types
>>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>>> at
>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.flink.api.scala.DataSet at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>
>>>>
>>>>
>>>> Greetings,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I'm using the Flink interpreter and the benv environment. I'm reading
>>>> some csv files using benv.readCsvFile and it works ok. I have also defined
>>>> a case class C for the csv records. The problem happens when I apply a
>>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>>>> convert it into a DataSet[C].
>>>>
>>>> - If I define the case class C in some cell I get this error:
>>>>
>>>> java.lang.IllegalArgumentException: requirement failed: The class C is
>>>> an instance class, meaning it is not a member of a toplevel object, or of
>>>> an object contained in a toplevel object, therefore it requires an outer
>>>> instance to be instantiated, but we don't have a reference to the outer
>>>> instance. Please consider changing the outer class to an object.
>>>>
>>>>
>>>>
>>>> - That sounds related to this
>>>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>>> it looks like the zeppelin flink interpreter is wrapping the case class
>>>> definition as an inner class. I tried defining the case class C inside an
>>>> object Types that I define in another cell. With that I also get a
>>>> serialization exception.
>>>>
>>>> org.apache.flink.api.common.InvalidProgramException: Task not
>>>> serializable
>>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>>>
>>>>
>>>>
>>>> I guess that didn't work because the object Types is still defined
>>>> inside some class implicitly defined by the interpreter.
>>>>
>>>>
>>>>
>>>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>>>> refer to the code in the cells, is there some convention I can use to
>>>> understand to which line in the notebook those error messages are referring
>>>> to?
>>>>
>>>>
>>>>
>>>> Thanks in advance,
>>>>
>>>>
>>>>
>>>> Juan
>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>
Re: Serialization issues with case classes with Flink
Posted by Jeff Zhang <zj...@gmail.com>.
Hi Juan,
This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969
The workaround is to use POJO class instead of case class.
Juan Rodríguez Hortalá <ju...@gmail.com> 于2020年4月19日周日
下午7:58写道:
> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi Ravi,
>>
>> I posted another message with the minimal reproduction, I repeat it here:
>>
>> ```scala
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>> ```
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>> As you can see, this minimal example doesn't use CSV files, so I don't
>> think the CSV connector it the problem.
>>
>> I don't think this is a Flink issue either, as that example works fine in
>> the Flink shell:
>>
>> ```
>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
>> scala> val xs = benv.fromCollection(1 to 10)
>> xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@6012bee8
>>
>> scala> xs.collect
>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>
>> scala> val cs = xs.map{C(_)}
>> cs: org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@36f807f5
>>
>> scala> cs.collect
>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
>> C(9), C(10))
>>
>> scala>
>> ```
>>
>> Note I was running Zeppelin in a docker container in my laptop, also
>> using Flink's local mode
>>
>> I think this is a problem with the Zeppelin integratin with Flink, and
>> how it processes case class definitions in a cell.
>>
>> Thanks for your answer,
>>
>> Juan
>>
>>
>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
>> ravi.pullareddy@minlog.com.au> wrote:
>>
>>> Hi Juan
>>>
>>>
>>>
>>> I have written various applications for continuous processing of csv
>>> files. Please post your entire code and how you are mapping. It becomes
>>> easy to highlight the issue.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ravi Pullareddy
>>>
>>>
>>>
>>>
>>>
>>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>>> *Sent:* Sunday, 19 April 2020 6:25 PM
>>> *To:* users@zeppelin.apache.org
>>> *Subject:* Re: Serialization issues with case classes with Flink
>>>
>>>
>>>
>>> Just for the record, the Spark version of that works fine:
>>>
>>>
>>>
>>> ```
>>> %spark
>>>
>>> case class C2(x: Int)
>>>
>>> val xs = sc.parallelize(1 to 10)
>>> val csSpark = xs.map{C2(_)}
>>>
>>> csSpark.collect
>>>
>>>
>>>
>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6),
>>> C2(7), C2(8), C2(9), C2(10))
>>>
>>> ```
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Minimal reproduction:
>>>
>>> - Fist option
>>>
>>> ```scala
>>>
>>> case class C(x: Int)
>>>
>>> val xs = benv.fromCollection(1 to 10)
>>> val cs = xs.map{C(_)}
>>>
>>> cs.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>> org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@205a35a
>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>> instance class, meaning it is not a member of a toplevel object, or of an
>>> object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object. at
>>> scala.Predef$.require(Predef.scala:224) at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>> at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>> ... 125 elided
>>>
>>> - Second option
>>>
>>> ```scala
>>>
>>> object Types {
>>> case class C(x: Int)
>>> }
>>>
>>> val cs2 = xs.map{Types.C(_)}
>>>
>>> cs2.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined object Types
>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.api.scala.DataSet at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>
>>>
>>>
>>> Greetings,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I'm using the Flink interpreter and the benv environment. I'm reading
>>> some csv files using benv.readCsvFile and it works ok. I have also defined
>>> a case class C for the csv records. The problem happens when I apply a
>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>>> convert it into a DataSet[C].
>>>
>>> - If I define the case class C in some cell I get this error:
>>>
>>> java.lang.IllegalArgumentException: requirement failed: The class C is
>>> an instance class, meaning it is not a member of a toplevel object, or of
>>> an object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object.
>>>
>>>
>>>
>>> - That sounds related to this
>>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>> it looks like the zeppelin flink interpreter is wrapping the case class
>>> definition as an inner class. I tried defining the case class C inside an
>>> object Types that I define in another cell. With that I also get a
>>> serialization exception.
>>>
>>> org.apache.flink.api.common.InvalidProgramException: Task not
>>> serializable
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>>
>>>
>>>
>>> I guess that didn't work because the object Types is still defined
>>> inside some class implicitly defined by the interpreter.
>>>
>>>
>>>
>>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>>> refer to the code in the cells, is there some convention I can use to
>>> understand to which line in the notebook those error messages are referring
>>> to?
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>>
>>>
>>> Juan
>>>
>>>
--
Best Regards
Jeff Zhang
Re: Serialization issues with case classes with Flink
Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink
On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
> Hi Ravi,
>
> I posted another message with the minimal reproduction, I repeat it here:
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
> ```
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
> ```
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
> Thanks for your answer,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullareddy@minlog.com.au> wrote:
>
>> Hi Juan
>>
>>
>>
>> I have written various applications for continuous processing of csv
>> files. Please post your entire code and how you are mapping. It becomes
>> easy to highlight the issue.
>>
>>
>>
>> Thanks
>>
>> Ravi Pullareddy
>>
>>
>>
>>
>>
>> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
>> *Sent:* Sunday, 19 April 2020 6:25 PM
>> *To:* users@zeppelin.apache.org
>> *Subject:* Re: Serialization issues with case classes with Flink
>>
>>
>>
>> Just for the record, the Spark version of that works fine:
>>
>>
>>
>> ```
>> %spark
>>
>> case class C2(x: Int)
>>
>> val xs = sc.parallelize(1 to 10)
>> val csSpark = xs.map{C2(_)}
>>
>> csSpark.collect
>>
>>
>>
>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
>> C2(8), C2(9), C2(10))
>>
>> ```
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Juan
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Minimal reproduction:
>>
>> - Fist option
>>
>> ```scala
>>
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>>
>> ```
>>
>>
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>> - Second option
>>
>> ```scala
>>
>> object Types {
>> case class C(x: Int)
>> }
>>
>> val cs2 = xs.map{Types.C(_)}
>>
>> cs2.count
>>
>> ```
>>
>>
>>
>> defined object Types org.apache.flink.api.common.InvalidProgramException:
>> Task not serializable at
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>> Caused by: java.io.NotSerializableException:
>> org.apache.flink.api.scala.DataSet at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>
>>
>>
>> Greetings,
>>
>>
>>
>> Juan
>>
>>
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>> - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>>
>> - That sounds related to this
>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>> it looks like the zeppelin flink interpreter is wrapping the case class
>> definition as an inner class. I tried defining the case class C inside an
>> object Types that I define in another cell. With that I also get a
>> serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>>
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>>
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>>
>>
>> Thanks in advance,
>>
>>
>>
>> Juan
>>
>>
Re: Serialization issues with case classes with Flink
Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Ravi,
I posted another message with the minimal reproduction, I repeat it here:
```scala
case class C(x: Int)
val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}
cs.count
```
defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided
As you can see, this minimal example doesn't use CSV files, so I don't
think the CSV connector it the problem.
I don't think this is a Flink issue either, as that example works fine in
the Flink shell:
```
root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local
scala> val xs = benv.fromCollection(1 to 10)
xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@6012bee8
scala> xs.collect
res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val cs = xs.map{C(_)}
cs: org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@36f807f5
scala> cs.collect
res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9),
C(10))
scala>
```
Note I was running Zeppelin in a docker container in my laptop, also using
Flink's local mode
I think this is a problem with the Zeppelin integratin with Flink, and how
it processes case class definitions in a cell.
Thanks for your answer,
Juan
On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
ravi.pullareddy@minlog.com.au> wrote:
> Hi Juan
>
>
>
> I have written various applications for continuous processing of csv
> files. Please post your entire code and how you are mapping. It becomes
> easy to highlight the issue.
>
>
>
> Thanks
>
> Ravi Pullareddy
>
>
>
>
>
> *From:* Juan Rodríguez Hortalá <ju...@gmail.com>
> *Sent:* Sunday, 19 April 2020 6:25 PM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Just for the record, the Spark version of that works fine:
>
>
>
> ```
> %spark
>
> case class C2(x: Int)
>
> val xs = sc.parallelize(1 to 10)
> val csSpark = xs.map{C2(_)}
>
> csSpark.collect
>
>
>
> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
> C2(8), C2(9), C2(10))
>
> ```
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Minimal reproduction:
>
> - Fist option
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> - Second option
>
> ```scala
>
> object Types {
> case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
>
> ```
>
>
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>
>
> Greetings,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
>
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
> - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>
> - That sounds related to this
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
> it looks like the zeppelin flink interpreter is wrapping the case class
> definition as an inner class. I tried defining the case class C inside an
> object Types that I define in another cell. With that I also get a
> serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
>
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
>
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
>
>
> Thanks in advance,
>
>
>
> Juan
>
>
RE: Serialization issues with case classes with Flink
Posted by Ravi Pullareddy <ra...@minlog.com.au>.
Hi Juan
I have written various applications for continuous processing of csv files.
Please post your entire code and how you are mapping. It becomes easy to
highlight the issue.
Thanks
Ravi Pullareddy
*From:* Juan Rodríguez Hortalá <ju...@gmail.com>
*Sent:* Sunday, 19 April 2020 6:25 PM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink
Just for the record, the Spark version of that works fine:
```
%spark
case class C2(x: Int)
val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}
csSpark.collect
res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))
```
Thanks,
Juan
On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
Minimal reproduction:
- Fist option
```scala
case class C(x: Int)
val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}
cs.count
```
defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided
- Second option
```scala
object Types {
case class C(x: Int)
}
val cs2 = xs.map{Types.C(_)}
cs2.count
```
defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
Greetings,
Juan
On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
Hi,
I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].
- If I define the case class C in some cell I get this error:
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.
- That sounds related to this
https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
it looks like the zeppelin flink interpreter is wrapping the case class
definition as an inner class. I tried defining the case class C inside an
object Types that I define in another cell. With that I also get a
serialization exception.
org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.
Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?
Thanks in advance,
Juan
Re: Serialization issues with case classes with Flink
Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Just for the record, the Spark version of that works fine:
```
%spark
case class C2(x: Int)
val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}
csSpark.collect
res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))
```
Thanks,
Juan
On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
> Minimal reproduction:
>
> - Fist option
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> - Second option
>
> ```scala
> object Types {
> case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
> ```
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> Greetings,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>> - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>> - That sounds related to this
>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>> it looks like the zeppelin flink interpreter is wrapping the case class
>> definition as an inner class. I tried defining the case class C inside an
>> object Types that I define in another cell. With that I also get a
>> serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>> Thanks in advance,
>>
>> Juan
>>
>
Re: Serialization issues with case classes with Flink
Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Minimal reproduction:
- Fist option
```scala
case class C(x: Int)
val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}
cs.count
```
defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided
- Second option
```scala
object Types {
case class C(x: Int)
}
val cs2 = xs.map{Types.C(_)}
cs2.count
```
defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
Greetings,
Juan
On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com> wrote:
> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
> - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
> - That sounds related to this
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
> it looks like the zeppelin flink interpreter is wrapping the case class
> definition as an inner class. I tried defining the case class C inside an
> object Types that I define in another cell. With that I also get a
> serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>
Re: Serialization issues with case classes with Flink
Posted by Zahid Rahman <za...@gmail.com>.
You may also want to look at this
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
On Sun, 19 Apr 2020, 09:05 Juan Rodríguez Hortalá, <
juan.rodriguez.hortala@gmail.com> wrote:
> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
> - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
> - That sounds related to this
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
> it looks like the zeppelin flink interpreter is wrapping the case class
> definition as an inner class. I tried defining the case class C inside an
> object Types that I define in another cell. With that I also get a
> serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>
Re: Serialization issues with case classes with Flink
Posted by Som Lima <so...@gmail.com>.
You may wish to have a look at Apache avro project.
It shows you how to Generare source code for schema classes for creating
objects to be used in reading csv files etc. for the purpose of
serialisation and deserialization.
https://avro.apache.org/docs/current/gettingstartedjava.html
The avro project is mentioned in the flink documents.
On Sun, 19 Apr 2020, 09:05 Juan Rodríguez Hortalá, <
juan.rodriguez.hortala@gmail.com> wrote:
> Hi,
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
> - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
> - That sounds related to this
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
> it looks like the zeppelin flink interpreter is wrapping the case class
> definition as an inner class. I tried defining the case class C inside an
> object Types that I define in another cell. With that I also get a
> serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
> Thanks in advance,
>
> Juan
>