You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/12/15 01:16:07 UTC
[spark] branch master updated: [SPARK-25100][CORE] Register
TaskCommitMessage to KyroSerializer
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fb2f5a4 [SPARK-25100][CORE] Register TaskCommitMessage to KyroSerializer
fb2f5a4 is described below
commit fb2f5a49061f4593648a9822bacaea8bfd046505
Author: xiaodeshan <xi...@xiaomi.com>
AuthorDate: Sat Dec 14 17:15:30 2019 -0800
[SPARK-25100][CORE] Register TaskCommitMessage to KyroSerializer
## What changes were proposed in this pull request?
Fix the bug when invoking saveAsNewAPIHadoopDataset to store data, the job will fail because the class TaskCommitMessage hasn't be registered if serializer is KryoSerializer and spark.kryo.registrationRequired is true
## How was this patch tested?
UT
Closes #26714 from deshanxiao/SPARK-25100.
Authored-by: xiaodeshan <xi...@xiaomi.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/serializer/KryoSerializer.scala | 4 ++-
.../test/scala/org/apache/spark/FileSuite.scala | 30 ++++++++++++++++++++++
.../spark/serializer/KryoSerializerSuite.scala | 23 +++++++++++++++++
3 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 6efb8b3..cdaab59 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -40,6 +40,7 @@ import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.io.FileCommitProtocol._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
@@ -469,7 +470,8 @@ private[serializer] object KryoSerializer {
classOf[Array[String]],
classOf[Array[Array[String]]],
classOf[BoundedPriorityQueue[_]],
- classOf[SparkConf]
+ classOf[SparkConf],
+ classOf[TaskCommitMessage]
)
private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 0368d77..ed11653 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
+import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -700,4 +701,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
}
+
+ test("SPARK-25100: Using KryoSerializer and" +
+ "setting registrationRequired true can lead job failed") {
+ val inputFile = new File(tempDir, "/input").getAbsolutePath
+ val textFileOutputDir = new File(tempDir, "/out1").getAbsolutePath
+ val dataSetDir = new File(tempDir, "/out2").getAbsolutePath
+
+ Utils.tryWithResource(new PrintWriter(new File(inputFile))) { writer =>
+ for (i <- 1 to 100) {
+ writer.print(i)
+ writer.write('\n')
+ }
+ }
+
+ val conf = new SparkConf(false).setMaster("local").
+ set("spark.kryo.registrationRequired", "true").setAppName("test")
+ conf.set("spark.serializer", classOf[KryoSerializer].getName)
+
+ val jobConf = new JobConf()
+ jobConf.setOutputKeyClass(classOf[IntWritable])
+ jobConf.setOutputValueClass(classOf[IntWritable])
+ jobConf.set("mapred.output.dir", dataSetDir)
+
+ sc = new SparkContext(conf)
+ val pairRDD = sc.textFile(inputFile).map(x => (x, 1))
+
+ pairRDD.saveAsTextFile(textFileOutputDir)
+ pairRDD.saveAsNewAPIHadoopDataset(jobConf)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index d7c1512..c55efe9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -34,6 +34,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
@@ -358,6 +359,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("registration of TaskCommitMessage") {
+ val conf = new SparkConf(false)
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
+
+ val ser = new KryoSerializer(conf).newInstance()
+ // In HadoopMapReduceCommitProtocol#commitTask
+ val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map()
+ addedAbsPathFiles.put("test1", "test1")
+ addedAbsPathFiles.put("test2", "test2")
+
+ val partitionPaths: mutable.Set[String] = mutable.Set()
+ partitionPaths.add("test3")
+
+ val taskCommitMessage1 = new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
+ val taskCommitMessage2 = new TaskCommitMessage(Map.empty -> Set.empty)
+ Seq(taskCommitMessage1, taskCommitMessage2).foreach { taskCommitMessage =>
+ val obj1 = ser.deserialize[TaskCommitMessage](ser.serialize(taskCommitMessage)).obj
+ val obj2 = taskCommitMessage.obj
+ assert(obj1 == obj2)
+ }
+ }
+
test("serialization buffer overflow reporting") {
import org.apache.spark.SparkException
val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org