You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "SmedbergM (JIRA)" <ji...@apache.org> on 2017/06/07 20:04:18 UTC

[jira] [Updated] (FLINK-6866) ClosureCleaner.clean fails for scala's JavaConverters wrapper classes

     [ https://issues.apache.org/jira/browse/FLINK-6866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

SmedbergM updated FLINK-6866:
-----------------------------
    Description: 
MWE:

```
import scala.collection.JavaConverters._
import org.apache.flink.api.java.ClosureCleaner

object SerializationFailureMWE extends App {
  val m4j: java.util.Map[String,String] = new java.util.HashMap
  m4j.put("key1", "value1")

  val m: java.util.Map[String,String] = Map(
    "key1" -> "value1"
  ).asJava

  println("Cleaning native Java map")

  ClosureCleaner.clean(m4j, true)

  println("Cleaning map converted by JavaConverters")

  ClosureCleaner.clean(m, true)
}
```

Program output:
```
Cleaning native Java map
Cleaning map converted by JavaConverters
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: {key1=value1} is not serializable. The object probably contains or references non serializable fields.
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
	at SerializationFailureMWE$delayedInit$body.apply(SerializationFailureMWE.scala:17)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
	at scala.App$class.main(App.scala:71)
	at SerializationFailureMWE$.main(SerializationFailureMWE.scala:5)
	at SerializationFailureMWE.main(SerializationFailureMWE.scala)
Caused by: java.io.NotSerializableException: scala.collection.convert.Wrappers$MapWrapper
...
```

  was:
MWE:

```
import scala.collection.JavaConverters._
import org.apache.flink.api.java.ClosureCleaner

object SerializationFailureMWE extends App {
  val m4j: java.util.Map[String,String] = new java.util.HashMap
  m4j.put("key1", "value1")

  val m: java.util.Map[String,String] = Map(
    "key1" -> "value1"
  ).asJava

  println("Cleaning native Java map")
  ClosureCleaner.clean(m4j, true)

  println("Cleaning map converted by JavaConverters")
  ClosureCleaner.clean(m, true)
}
```

Program output:
```
Cleaning native Java map
Cleaning map converted by JavaConverters
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: {key1=value1} is not serializable. The object probably contains or references non serializable fields.
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
	at SerializationFailureMWE$delayedInit$body.apply(SerializationFailureMWE.scala:17)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
	at scala.App$class.main(App.scala:71)
	at SerializationFailureMWE$.main(SerializationFailureMWE.scala:5)
	at SerializationFailureMWE.main(SerializationFailureMWE.scala)
Caused by: java.io.NotSerializableException: scala.collection.convert.Wrappers$MapWrapper
...
```


> ClosureCleaner.clean fails for scala's JavaConverters wrapper classes
> ---------------------------------------------------------------------
>
>                 Key: FLINK-6866
>                 URL: https://issues.apache.org/jira/browse/FLINK-6866
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0
>            Reporter: SmedbergM
>
> MWE:
> ```
> import scala.collection.JavaConverters._
> import org.apache.flink.api.java.ClosureCleaner
> object SerializationFailureMWE extends App {
>   val m4j: java.util.Map[String,String] = new java.util.HashMap
>   m4j.put("key1", "value1")
>   val m: java.util.Map[String,String] = Map(
>     "key1" -> "value1"
>   ).asJava
>   println("Cleaning native Java map")
>   ClosureCleaner.clean(m4j, true)
>   println("Cleaning map converted by JavaConverters")
>   ClosureCleaner.clean(m, true)
> }
> ```
> Program output:
> ```
> Cleaning native Java map
> Cleaning map converted by JavaConverters
> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: {key1=value1} is not serializable. The object probably contains or references non serializable fields.
> 	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> 	at SerializationFailureMWE$delayedInit$body.apply(SerializationFailureMWE.scala:17)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:71)
> 	at scala.App$$anonfun$main$1.apply(App.scala:71)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
> 	at scala.App$class.main(App.scala:71)
> 	at SerializationFailureMWE$.main(SerializationFailureMWE.scala:5)
> 	at SerializationFailureMWE.main(SerializationFailureMWE.scala)
> Caused by: java.io.NotSerializableException: scala.collection.convert.Wrappers$MapWrapper
> ...
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)