You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/29 04:02:54 UTC
[spark] branch branch-3.0 updated: [SPARK-32283][CORE] Kryo should
support multiple user registrators
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9f18d54 [SPARK-32283][CORE] Kryo should support multiple user registrators
9f18d54 is described below
commit 9f18d54ab26d64a23438bd242e77ba27311495ec
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Wed Jul 29 03:58:03 2020 +0000
[SPARK-32283][CORE] Kryo should support multiple user registrators
### What changes were proposed in this pull request?
`spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').map(_.trim)
.filter(!_.isEmpty)
```
But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala`
### Why are the changes needed?
In previous Spark version (2.x), it supported multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').map(_.trim)
.filter(!_.isEmpty)
```
But it doesn't work in 3.0. It's should be a regression.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existed unit tests.
Closes #29123 from LantaoJin/SPARK-32283.
Authored-by: LantaoJin <ji...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 26e6574d58429add645db820a83b70ef9dcd49fe)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/scala/org/apache/spark/internal/config/Kryo.scala | 3 ++-
core/src/test/scala/org/apache/spark/SparkConfSuite.scala | 2 +-
.../test/scala/org/apache/spark/serializer/KryoBenchmark.scala | 2 +-
.../org/apache/spark/serializer/KryoSerializerBenchmark.scala | 2 +-
.../spark/serializer/KryoSerializerDistributedSuite.scala | 2 +-
.../org/apache/spark/serializer/KryoSerializerSuite.scala | 10 +++++-----
.../apache/spark/serializer/SerializerPropertiesSuite.scala | 2 +-
.../apache/spark/sql/DatasetSerializerRegistratorSuite.scala | 2 +-
8 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
index 646d855..90c59b0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
@@ -29,7 +29,8 @@ private[spark] object Kryo {
val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
.version("0.5.0")
.stringConf
- .createOptional
+ .toSequence
+ .createWithDefault(Nil)
val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
.version("1.2.0")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 3bc2061..72e7ee0 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -221,7 +221,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)
- conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[CustomRegistrator].getName))
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index fd228cd..525e682 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -125,7 +125,7 @@ object KryoBenchmark extends BenchmarkBase {
def createSerializer(useUnsafe: Boolean): SerializerInstance = {
val conf = new SparkConf()
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
- conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, useUnsafe)
new KryoSerializer(conf).newInstance()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
index 953b651..dde0c98 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
@@ -76,7 +76,7 @@ object KryoSerializerBenchmark extends BenchmarkBase {
conf.set(EXECUTOR_EXTRA_JAVA_OPTIONS,
"-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
- conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_POOL, usePool)
if (sc != null) {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index d4fafab..397fdce 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -29,7 +29,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
.set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
- .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
+ .set(config.Kryo.KRYO_USER_REGISTRATORS, Seq(classOf[AppJarRegistrator].getName))
.set(config.TASK_MAX_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 4c47a67..229ef699 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
- conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, false)
test("SPARK-7392 configuration limits") {
@@ -313,7 +313,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
import org.apache.spark.SparkException
val conf = new SparkConf(false)
- conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
+ conf.set(KRYO_USER_REGISTRATORS, Seq("this.class.does.not.exist"))
val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
@@ -412,7 +412,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
- classOf[RegistratorWithoutAutoReset].getName)
+ Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
}
@@ -443,7 +443,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
.set(KRYO_REFERENCE_TRACKING, referenceTracking)
.set(KRYO_USE_POOL, usePool)
if (!autoReset) {
- conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
@@ -530,7 +530,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
- conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+ conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
conf.set(KRYO_REFERENCE_TRACKING, true)
conf.set(SHUFFLE_MANAGER, "sort")
conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200)
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
index dad080c..9747f57 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -52,7 +52,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {
test("KryoSerializer does not support relocation when auto-reset is disabled") {
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
- classOf[RegistratorWithoutAutoReset].getName)
+ Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
index 43de266..b20d050 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -34,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSparkSessio
override protected def sparkConf: SparkConf = {
// Make sure we use the KryoRegistrator
- super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
+ super.sparkConf.set(KRYO_USER_REGISTRATORS, Seq(TestRegistrator().getClass.getCanonicalName))
}
test("Kryo registrator") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org