You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lukas Rytz (JIRA)" <ji...@apache.org> on 2017/08/09 07:35:01 UTC
[jira] [Comment Edited] (SPARK-14540) Support Scala 2.12 closures
and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119524#comment-16119524 ]
Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:34 AM:
------------------------------------------------------------
[~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}.
Example code
{code}
class C {
def foo(f: String => Object) = 0
def bar = {
foo { x: Any => new Object{} }
}
}
{code}
{noformat}
$> scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
$> scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of cleanup]] // Test.scala
package <empty> {
class C extends Object {
def foo(f: Function1): Int = 0;
def bar(): Int = C.this.foo({
((x: Object) => C.this.$anonfun|$1(x))
});
final <artifact> private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this);
def <init>(): C = {
C.super.<init>();
()
}
};
final class anon$1 extends Object {
def <init>($outer: C): <$anon: Object> = {
anon$1.super.<init>();
()
}
}
}
[[syntax trees at end of delambdafy]] // Test.scala
package <empty> {
class C extends Object {
def foo(f: Function1): Int = 0;
def bar(): Int = C.this.foo({
$anonfun()
});
final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: Object>(null);
def <init>(): C = {
C.super.<init>();
()
}
};
final class anon$1 extends Object {
def <init>($outer: C): <$anon: Object> = {
anon$1.super.<init>();
()
}
}
}
$> javap -v -cp . C
...
public static final java.lang.Object $anonfun$bar$1(java.lang.Object);
descriptor: (Ljava/lang/Object;)Ljava/lang/Object;
flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC
Code:
stack=3, locals=1, args_size=1
0: new #10 // class C$$anon$1
3: dup
4: aconst_null
5: invokespecial #51 // Method C$$anon$1."<init>":(LC;)V
8: areturn
...
{noformat}
was (Author: lrytz):
[~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}.
Example code
{code}
class C {
def foo(f: String => Object) = 0
def bar = {
foo { x: Any => new Object{} }
}
}
{code}
{noformat}
➜ sandbox git:(backendRefactor) ✗ scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
➜ sandbox git:(backendRefactor) ✗ scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of cleanup]] // Test.scala
package <empty> {
class C extends Object {
def foo(f: Function1): Int = 0;
def bar(): Int = C.this.foo({
((x: Object) => C.this.$anonfun|$1(x))
});
final <artifact> private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this);
def <init>(): C = {
C.super.<init>();
()
}
};
final class anon$1 extends Object {
def <init>($outer: C): <$anon: Object> = {
anon$1.super.<init>();
()
}
}
}
[[syntax trees at end of delambdafy]] // Test.scala
package <empty> {
class C extends Object {
def foo(f: Function1): Int = 0;
def bar(): Int = C.this.foo({
$anonfun()
});
final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: Object>(null);
def <init>(): C = {
C.super.<init>();
()
}
};
final class anon$1 extends Object {
def <init>($outer: C): <$anon: Object> = {
anon$1.super.<init>();
()
}
}
}
{noformat}
> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> ----------------------------------------------------------------
>
> Key: SPARK-14540
> URL: https://issues.apache.org/jira/browse/SPARK-14540
> Project: Spark
> Issue Type: Sub-task
> Components: Spark Core
> Reporter: Josh Rosen
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning time *** FAILED *** (32 milliseconds)
> [info] Expected exception org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 milliseconds)
> [info] Expected ReturnStatementInClosureException, but got org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.lang.Object
> [info] - element of array (index: 0)
> [info] - array (class "[Ljava.lang.Object;", size: 1)
> [info] - field (class "java.lang.invoke.SerializedLambda", name: "capturedArgs", type: "class [Ljava.lang.Object;")
> [info] - object (class "java.lang.invoke.SerializedLambda", SerializedLambda[capturingClass=class org.apache.spark.util.TestUserClosuresActuallyCleaned$, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I, implementation=invokeStatic org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I, instantiatedMethodType=(I)I, numCaptured=1])
> [info] - element of array (index: 0)
> [info] - array (class "[Ljava.lang.Object;", size: 1)
> [info] - field (class "java.lang.invoke.SerializedLambda", name: "capturedArgs", type: "class [Ljava.lang.Object;")
> [info] - object (class "java.lang.invoke.SerializedLambda", SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
> [info] - field (class "org.apache.spark.rdd.MapPartitionsRDD", name: "f", type: "interface scala.Function3")
> [info] - object (class "org.apache.spark.rdd.MapPartitionsRDD", MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info] - field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
> [info] - root object (class "scala.Tuple2", (MapPartitionsRDD[2] at apply at Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info] This means the closure provided by user is not actually cleaned. (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 lambdas.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org