You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lukas Rytz (JIRA)" <ji...@apache.org> on 2017/08/09 07:35:01 UTC

[jira] [Comment Edited] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

    [ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119524#comment-16119524 ] 

Lukas Rytz edited comment on SPARK-14540 at 8/9/17 7:34 AM:
------------------------------------------------------------

[~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}.

Example code
{code}
class C {
  def foo(f: String => Object) = 0
  def bar = {
    foo { x: Any => new Object{} }
  }
}
{code}

{noformat}
$> scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
$> scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of                   cleanup]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      ((x: Object) => C.this.$anonfun|$1(x))
    });
    final <artifact> private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

[[syntax trees at end of                delambdafy]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      $anonfun()
    });
    final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: Object>(null);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

$> javap -v -cp . C
...
  public static final java.lang.Object $anonfun$bar$1(java.lang.Object);
    descriptor: (Ljava/lang/Object;)Ljava/lang/Object;
    flags: ACC_PUBLIC, ACC_STATIC, ACC_FINAL, ACC_SYNTHETIC
    Code:
      stack=3, locals=1, args_size=1
         0: new           #10                 // class C$$anon$1
         3: dup
         4: aconst_null
         5: invokespecial #51                 // Method C$$anon$1."<init>":(LC;)V
         8: areturn
...
{noformat}


was (Author: lrytz):
[~joshrosen] the closure in your last example is serializable with 2.12.3. The anonymous class takes an outer parameter, but since https://github.com/scala/scala/pull/5099 the compiler implements an analysis to see that it's not used, and replaces the argument with {{null}}.

Example code
{code}
class C {
  def foo(f: String => Object) = 0
  def bar = {
    foo { x: Any => new Object{} }
  }
}
{code}

{noformat}
➜  sandbox git:(backendRefactor) ✗ scalac -version
Scala compiler version 2.12.3 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
➜  sandbox git:(backendRefactor) ✗ scalac Test.scala -Xprint:cleanup,delambdafy
[[syntax trees at end of                   cleanup]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      ((x: Object) => C.this.$anonfun|$1(x))
    });
    final <artifact> private[this] def $anonfun|$1(x: Object): Object = new <$anon: Object>(C.this);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}

[[syntax trees at end of                delambdafy]] // Test.scala
package <empty> {
  class C extends Object {
    def foo(f: Function1): Int = 0;
    def bar(): Int = C.this.foo({
      $anonfun()
    });
    final <static> <artifact> def $anonfun|$1(x: Object): Object = new <$anon: Object>(null);
    def <init>(): C = {
      C.super.<init>();
      ()
    }
  };
  final class anon$1 extends Object {
    def <init>($outer: C): <$anon: Object> = {
      anon$1.super.<init>();
      ()
    }
  }
}
{noformat}

> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> ----------------------------------------------------------------
>
>                 Key: SPARK-14540
>                 URL: https://issues.apache.org/jira/browse/SPARK-14540
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>            Reporter: Josh Rosen
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning time *** FAILED *** (32 milliseconds)
> [info]   Expected exception org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 milliseconds)
> [info]   Expected ReturnStatementInClosureException, but got org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.lang.Object
> [info]   	- element of array (index: 0)
> [info]   	- array (class "[Ljava.lang.Object;", size: 1)
> [info]   	- field (class "java.lang.invoke.SerializedLambda", name: "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]   	- object (class "java.lang.invoke.SerializedLambda", SerializedLambda[capturingClass=class org.apache.spark.util.TestUserClosuresActuallyCleaned$, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I, implementation=invokeStatic org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I, instantiatedMethodType=(I)I, numCaptured=1])
> [info]   	- element of array (index: 0)
> [info]   	- array (class "[Ljava.lang.Object;", size: 1)
> [info]   	- field (class "java.lang.invoke.SerializedLambda", name: "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]   	- object (class "java.lang.invoke.SerializedLambda", SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
> [info]   	- field (class "org.apache.spark.rdd.MapPartitionsRDD", name: "f", type: "interface scala.Function3")
> [info]   	- object (class "org.apache.spark.rdd.MapPartitionsRDD", MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info]   	- field (class "scala.Tuple2", name: "_1", type: "class java.lang.Object")
> [info]   	- root object (class "scala.Tuple2", (MapPartitionsRDD[2] at apply at Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info]   This means the closure provided by user is not actually cleaned. (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 lambdas.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org