You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rob Russo (Jira)" <ji...@apache.org> on 2019/10/17 09:09:00 UTC

[jira] [Created] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field

Rob Russo created SPARK-29497:
---------------------------------

             Summary: Cannot assign instance of java.lang.invoke.SerializedLambda to field
                 Key: SPARK-29497
                 URL: https://issues.apache.org/jira/browse/SPARK-29497
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 2.4.3
         Environment: Spark 2.4.3 Scala 2.12
            Reporter: Rob Russo


Note this is for scala 2.12:

There seems to be an issue in spark with serializing a udf that is created from a function assigned to a class member that references another function assigned to a class member. This is similar to https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the resolution has an issue with this case. After trimming it down to the base issue I came up with the following to reproduce:

 

 
{code:java}
object TestLambdaShell extends Serializable {
  val hello: String => String = s => s"hello $s!"  
  val lambdaTest: String => String = hello( _ )  
  def functionTest: String => String = hello( _ )
}

val hello = udf( TestLambdaShell.hello )
val functionTest = udf( TestLambdaShell.functionTest )
val lambdaTest = udf( TestLambdaShell.lambdaTest )

sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
{code}
 

All of which works except the last line which results in an exception on the executors:

 
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$.lambdaTest of type scala.Function1 in instance of $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
  at org.apache.spark.scheduler.Task.run(Task.scala:121)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
{code}
 

In spark 2.2.x I used a class that had something like this that worked fine, now that we've upgraded to 2.12 we ran into a few serialization issues in places, most of which were solved by extending serializable but this case was not fixed by that.

 

Also this happens regardless of whether it's done in the shell or in a jar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org