You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Frederick Reiss (JIRA)" <ji...@apache.org> on 2015/05/05 01:48:06 UTC

[jira] [Commented] (SPARK-7043) KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line

    [ https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14527575#comment-14527575 ] 

Frederick Reiss commented on SPARK-7043:
----------------------------------------

I did some looking into this issue. Here's a summary of what I've found:

The interpreter rewrites each command into a pair of generated classes, one for the "read" phase and one for the "eval" phase of executing the statement.

For the command:
{{case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()}}
this rewrite process produces the following class (simplified here by removing empty constructors) to cover the "read" phase:

{noformat}
class $read extends Serializable {
  class $iwC extends Serializable {
    val $VAL10 = $line3.$read.INSTANCE;
    import $VAL10.$iw.$iw.sc;
    class $iwC extends Serializable {
      case class Foo extends scala.Product with scala.Serializable {
        <caseaccessor> <paramaccessor> val i: Int = _;
      };
      val ret = sc.parallelize(1.to(100).map(Foo), 10).collect()
    };
    val $iw = new $iwC()
  };
  val $iw = new $iwC()
}
{noformat}

Note the lines
{{val $VAL10 = $line3.$read.INSTANCE;}}
{{import $VAL10.$iw.$iw.sc;}}
These lines pull the symbol {{sc}} into the scope of the class {{$read.$iwC}}. They also happen to give the class {{$read.$iwC}} a field of type {{$line3.$read.$iwC}}. I haven't been able to get at the definition of the generated class or object {{$line3}}, but it appears that {{$line3}} is a rewritten version of the Scala code
{noformat}
                @transient val sc = {
                  val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
                  println("Spark context available as sc.")
                  _sc
                }
{noformat}
which runs whenever the interpreter starts up.

The generated code for the current line gets further bundled inside a generated object or class (I'm not sure which) that represents the current line's state transitions. In the trace I'm looking at, this object/class is called {{$line19}}.

As a result, the case class {{Foo}} from the command line turns into the case class {{$line19.$read.$iwC.$iwC.Foo}}, which the Scala compiler turns into a Java inner class by the same name.

The call to {{sc.parallelize(1.to(100).map(Foo), 10)}} causes Spark to serialize an array of instances of {{$line19.$read.$iwC.$iwC.Foo}}. The serializer serializes each of these inner class objects, along with their parent class objects. During this serialization process, the serializer skips the pointer to the SparkContext, since that pointer is marked as transient.

Inside of the call to {{collect()}}, the deserializer reconstructs instances of {{$line19.$read.$iwC.$iwC.Foo}}. Before creating such an instance, the deserializer needs to create an enclosing instance of {{$line19.$read.$iwC.$iwC}}, which needs an enclosing instance of {{$line19.$read.$iwC}}. The class {{$line19.$read.$iwC}} contains a field ({{val $VAL10 = $line3.$read.INSTANCE}}) that has a pointer to a field of an object of type {{$line3.$read.$iwC}}. The type {{$line3.$read.$iwC}} in turn has a transient field called "sc" somewhere underneath it.

At some point during initializing the object of type {{$line3.$read.$iwC}}, the Kryo deserializer makes a call to the static initializer for the transient field "sc". This static initializer calls {{org.apache.spark.repl.Main.interp.createSparkContext()}}, which attempts and fails to create a second SparkContext. The deserialization process fails with an exception.

I'm not sure exactly why the Java deserializer doesn't crash in the same way. I suspect that, when deserializing the transient field "sc", Java's deserializer sets that field to null and moves on.

I'm not sure which way of initializing the transient field is correct. The serialization spec ([http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serialTOC.html]) is vague about default values for transient fields.


> KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7043
>                 URL: https://issues.apache.org/jira/browse/SPARK-7043
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell
>    Affects Versions: 1.3.1
>         Environment: Ubuntu 14.04, no hadoop
>            Reporter: Peng Cheng
>            Priority: Minor
>              Labels: classloader, kryo
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When deploying Spark-shell with
> "spark.serializer=org.apache.spark.serializer.KryoSerializer" option. Spark-shell cannot execute the following code (in 1 line):
>     case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
> This problem won't exist for either JavaSerializer or code splitted into 2 lines. The only possible explanation is that KryoSerializer is using a ClassLoader that is not registered as an subsidiary ClassLoader of the one in REPL.
> A "dirty" fix would be just breaking input by semicolon, but its better to fix the ClassLoader to avoid other liabilities.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org