You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/29 04:02:54 UTC

[spark] branch branch-3.0 updated: [SPARK-32283][CORE] Kryo should support multiple user registrators

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9f18d54  [SPARK-32283][CORE] Kryo should support multiple user registrators
9f18d54 is described below

commit 9f18d54ab26d64a23438bd242e77ba27311495ec
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Wed Jul 29 03:58:03 2020 +0000

    [SPARK-32283][CORE] Kryo should support multiple user registrators
    
    ### What changes were proposed in this pull request?
    `spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by
    ```scala
    private val userRegistrators = conf.get("spark.kryo.registrator", "")
        .split(',').map(_.trim)
        .filter(!_.isEmpty)
    ```
    But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala`
    
    ### Why are the changes needed?
    In previous Spark version (2.x), it supported multiple user registrators by
    ```scala
    private val userRegistrators = conf.get("spark.kryo.registrator", "")
        .split(',').map(_.trim)
        .filter(!_.isEmpty)
    ```
    But it doesn't work in 3.0. It's should be a regression.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existed unit tests.
    
    Closes #29123 from LantaoJin/SPARK-32283.
    
    Authored-by: LantaoJin <ji...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 26e6574d58429add645db820a83b70ef9dcd49fe)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/internal/config/Kryo.scala |  3 ++-
 core/src/test/scala/org/apache/spark/SparkConfSuite.scala      |  2 +-
 .../test/scala/org/apache/spark/serializer/KryoBenchmark.scala |  2 +-
 .../org/apache/spark/serializer/KryoSerializerBenchmark.scala  |  2 +-
 .../spark/serializer/KryoSerializerDistributedSuite.scala      |  2 +-
 .../org/apache/spark/serializer/KryoSerializerSuite.scala      | 10 +++++-----
 .../apache/spark/serializer/SerializerPropertiesSuite.scala    |  2 +-
 .../apache/spark/sql/DatasetSerializerRegistratorSuite.scala   |  2 +-
 8 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
index 646d855..90c59b0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
@@ -29,7 +29,8 @@ private[spark] object Kryo {
   val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
     .version("0.5.0")
     .stringConf
-    .createOptional
+    .toSequence
+    .createWithDefault(Nil)
 
   val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
     .version("1.2.0")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 3bc2061..72e7ee0 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -221,7 +221,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     conf.registerKryoClasses(Array(classOf[Class1]))
     assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)
 
-    conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
+    conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[CustomRegistrator].getName))
 
     // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
     // blow up.
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index fd228cd..525e682 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -125,7 +125,7 @@ object KryoBenchmark extends BenchmarkBase {
   def createSerializer(useUnsafe: Boolean): SerializerInstance = {
     val conf = new SparkConf()
     conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
-    conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+    conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
     conf.set(KRYO_USE_UNSAFE, useUnsafe)
 
     new KryoSerializer(conf).newInstance()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
index 953b651..dde0c98 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
@@ -76,7 +76,7 @@ object KryoSerializerBenchmark extends BenchmarkBase {
     conf.set(EXECUTOR_EXTRA_JAVA_OPTIONS,
       "-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads")
     conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
-    conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+    conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
     conf.set(KRYO_USE_POOL, usePool)
 
     if (sc != null) {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index d4fafab..397fdce 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -29,7 +29,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
   test("kryo objects are serialised consistently in different processes") {
     val conf = new SparkConf(false)
       .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
-      .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
+      .set(config.Kryo.KRYO_USER_REGISTRATORS, Seq(classOf[AppJarRegistrator].getName))
       .set(config.TASK_MAX_FAILURES, 1)
       .set(config.BLACKLIST_ENABLED, false)
 
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 4c47a67..229ef699 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils
 
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
-  conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+  conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
   conf.set(KRYO_USE_UNSAFE, false)
 
   test("SPARK-7392 configuration limits") {
@@ -313,7 +313,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     import org.apache.spark.SparkException
 
     val conf = new SparkConf(false)
-    conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
+    conf.set(KRYO_USER_REGISTRATORS, Seq("this.class.does.not.exist"))
 
     val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
     assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
@@ -412,7 +412,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(ser.getAutoReset)
     val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
-      classOf[RegistratorWithoutAutoReset].getName)
+      Seq(classOf[RegistratorWithoutAutoReset].getName))
     val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(!ser2.getAutoReset)
   }
@@ -443,7 +443,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
       .set(KRYO_REFERENCE_TRACKING, referenceTracking)
       .set(KRYO_USE_POOL, usePool)
     if (!autoReset) {
-      conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+      conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
     }
     val ser = new KryoSerializer(conf)
     val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
@@ -530,7 +530,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
 class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, classOf[KryoSerializer].getName)
-  conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+  conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
   conf.set(KRYO_REFERENCE_TRACKING, true)
   conf.set(SHUFFLE_MANAGER, "sort")
   conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200)
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
index dad080c..9747f57 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -52,7 +52,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {
 
   test("KryoSerializer does not support relocation when auto-reset is disabled") {
     val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
-      classOf[RegistratorWithoutAutoReset].getName)
+      Seq(classOf[RegistratorWithoutAutoReset].getName))
     val ser = new KryoSerializer(conf)
     assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
     testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
index 43de266..b20d050 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -34,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSparkSessio
 
   override protected def sparkConf: SparkConf = {
     // Make sure we use the KryoRegistrator
-    super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
+    super.sparkConf.set(KRYO_USER_REGISTRATORS, Seq(TestRegistrator().getClass.getCanonicalName))
   }
 
   test("Kryo registrator") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org