You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andras Nemeth (JIRA)" <ji...@apache.org> on 2014/08/15 17:30:19 UTC

[jira] [Updated] (SPARK-3070) Kry deserialization without using the custom registrator

     [ https://issues.apache.org/jira/browse/SPARK-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andras Nemeth updated SPARK-3070:
---------------------------------

    Description: 
If an RDD partition is cached on executor1 and used by a task on executor2 then the partition needs to be serialized and sent over. For this particular serialization/deserialization usecase, when using kry, it appears that the custom registrator will not be used on the deserialization side. This of course results in some totally misleading kry deserialization errors.

The cause for this behavior seems to be that the thread running this deserialization has a classloader which does not have the jars specified in the SparkConf on its classpath. So it fails to load the Registrator with a ClassNotFoundException, but it catches the exception and happily continues without a registrator. (A bug on its own right in my opinion.)

To reproduce, have two rdds partitioned the same way (as in with the same partitioner) but corresponding partitions cached on different machines, then join them. See below a somewhat convoluted way to achieve this. If you run the below program on a spark cluster with two workers, each with one core, you will be able to trigger the bug. Basically it runs two counts in parallel, which ensures that the two RDDs will be computed in parallel, and as a consequence on different executors.

{code:java}
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
import scala.actors.Actor

case class MyClass(a: Int)

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass])
  }
}

class CountActor(rdd: RDD[_]) extends Actor {
  def act() {
    println("Start count")
    println(rdd.count)
    println("Stop count")
  }
}

object KryBugExample  {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf()
      .setMaster(args(0))
      .setAppName("KryBugExample")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "MyKryoRegistrator")
      .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar"))
    val sc = new SparkContext(sparkConf)

    val partitioner = new HashPartitioner(1)
    val rdd1 = sc
      .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1)
      .partitionBy(partitioner).cache
    val rdd2 = sc
      .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1)
      .partitionBy(partitioner).cache
    new CountActor(rdd1).start
    new CountActor(rdd2).start
    println(rdd1.join(rdd2).count)
    while (true) {}
  }
}
{code}

  was:
If an RDD partition is cached on executor1 and used by a task on executor2 then the partition needs to be serialized and sent over. For this particular serialization/deserialization usecase, when using kry, it appears that the custom registrator will not be used on the deserialization side. This of course results in some totally misleading kry deserialization errors.

The cause for this behavior seems to be that the thread running this deserialization has a classloader which does not have the jars specified in the SparkConf on its classpath. So it fails to load the Registrator with a ClassNotFoundException, but it catches the exception and happily continues without a registrator. (A bug on its own right in my opinion.)

To reproduce, have two rdds partitioned the same way (as in with the same partitioner) but corresponding partitions cached on different machines, then join them. See below a somewhat convoluted way to achieve this. If you run the below program on a spark cluster with two workers, each with one core, you will be able to trigger the bug. Basically it runs two counts in parallel, which ensures that the two RDDs will be computed in parallel, and as a consequence on different executors.

{code:scala}
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
import scala.actors.Actor

case class MyClass(a: Int)

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass])
  }
}

class CountActor(rdd: RDD[_]) extends Actor {
  def act() {
    println("Start count")
    println(rdd.count)
    println("Stop count")
  }
}

object KryBugExample  {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf()
      .setMaster(args(0))
      .setAppName("KryBugExample")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "MyKryoRegistrator")
      .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar"))
    val sc = new SparkContext(sparkConf)

    val partitioner = new HashPartitioner(1)
    val rdd1 = sc
      .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1)
      .partitionBy(partitioner).cache
    val rdd2 = sc
      .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1)
      .partitionBy(partitioner).cache
    new CountActor(rdd1).start
    new CountActor(rdd2).start
    println(rdd1.join(rdd2).count)
    while (true) {}
  }
}
{code}


> Kry deserialization without using the custom registrator
> --------------------------------------------------------
>
>                 Key: SPARK-3070
>                 URL: https://issues.apache.org/jira/browse/SPARK-3070
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.2
>            Reporter: Andras Nemeth
>
> If an RDD partition is cached on executor1 and used by a task on executor2 then the partition needs to be serialized and sent over. For this particular serialization/deserialization usecase, when using kry, it appears that the custom registrator will not be used on the deserialization side. This of course results in some totally misleading kry deserialization errors.
> The cause for this behavior seems to be that the thread running this deserialization has a classloader which does not have the jars specified in the SparkConf on its classpath. So it fails to load the Registrator with a ClassNotFoundException, but it catches the exception and happily continues without a registrator. (A bug on its own right in my opinion.)
> To reproduce, have two rdds partitioned the same way (as in with the same partitioner) but corresponding partitions cached on different machines, then join them. See below a somewhat convoluted way to achieve this. If you run the below program on a spark cluster with two workers, each with one core, you will be able to trigger the bug. Basically it runs two counts in parallel, which ensures that the two RDDs will be computed in parallel, and as a consequence on different executors.
> {code:java}
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.HashPartitioner
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.rdd.RDD
> import org.apache.spark.serializer.KryoRegistrator
> import scala.actors.Actor
> case class MyClass(a: Int)
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[MyClass])
>   }
> }
> class CountActor(rdd: RDD[_]) extends Actor {
>   def act() {
>     println("Start count")
>     println(rdd.count)
>     println("Stop count")
>   }
> }
> object KryBugExample  {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf()
>       .setMaster(args(0))
>       .setAppName("KryBugExample")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator", "MyKryoRegistrator")
>       .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar"))
>     val sc = new SparkContext(sparkConf)
>     val partitioner = new HashPartitioner(1)
>     val rdd1 = sc
>       .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1)
>       .partitionBy(partitioner).cache
>     val rdd2 = sc
>       .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1)
>       .partitionBy(partitioner).cache
>     new CountActor(rdd1).start
>     new CountActor(rdd2).start
>     println(rdd1.join(rdd2).count)
>     while (true) {}
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org