You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/12/15 01:16:07 UTC

[spark] branch master updated: [SPARK-25100][CORE] Register TaskCommitMessage to KyroSerializer

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2f5a4  [SPARK-25100][CORE] Register TaskCommitMessage to KyroSerializer
fb2f5a4 is described below

commit fb2f5a49061f4593648a9822bacaea8bfd046505
Author: xiaodeshan <xi...@xiaomi.com>
AuthorDate: Sat Dec 14 17:15:30 2019 -0800

    [SPARK-25100][CORE] Register TaskCommitMessage to KyroSerializer
    
    ## What changes were proposed in this pull request?
    
    Fix the bug when invoking saveAsNewAPIHadoopDataset to store data, the job will fail because the class TaskCommitMessage hasn't be registered if serializer is KryoSerializer and spark.kryo.registrationRequired is true
    
    ## How was this patch tested?
    
    UT
    
    Closes #26714 from deshanxiao/SPARK-25100.
    
    Authored-by: xiaodeshan <xi...@xiaomi.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/serializer/KryoSerializer.scala   |  4 ++-
 .../test/scala/org/apache/spark/FileSuite.scala    | 30 ++++++++++++++++++++++
 .../spark/serializer/KryoSerializerSuite.scala     | 23 +++++++++++++++++
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 6efb8b3..cdaab59 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -40,6 +40,7 @@ import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.io.FileCommitProtocol._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
@@ -469,7 +470,8 @@ private[serializer] object KryoSerializer {
     classOf[Array[String]],
     classOf[Array[Array[String]]],
     classOf[BoundedPriorityQueue[_]],
-    classOf[SparkConf]
+    classOf[SparkConf],
+    classOf[TaskCommitMessage]
   )
 
   private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 0368d77..ed11653 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
 
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
+import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
@@ -700,4 +701,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
 
     assert(collectRDDAndDeleteFileBeforeCompute(true).isEmpty)
   }
+
+  test("SPARK-25100: Using KryoSerializer and" +
+      "setting registrationRequired true can lead job failed") {
+    val inputFile = new File(tempDir, "/input").getAbsolutePath
+    val textFileOutputDir = new File(tempDir, "/out1").getAbsolutePath
+    val dataSetDir = new File(tempDir, "/out2").getAbsolutePath
+
+    Utils.tryWithResource(new PrintWriter(new File(inputFile))) { writer =>
+      for (i <- 1 to 100) {
+        writer.print(i)
+        writer.write('\n')
+      }
+    }
+
+    val conf = new SparkConf(false).setMaster("local").
+      set("spark.kryo.registrationRequired", "true").setAppName("test")
+    conf.set("spark.serializer", classOf[KryoSerializer].getName)
+
+    val jobConf = new JobConf()
+    jobConf.setOutputKeyClass(classOf[IntWritable])
+    jobConf.setOutputValueClass(classOf[IntWritable])
+    jobConf.set("mapred.output.dir", dataSetDir)
+
+    sc = new SparkContext(conf)
+    val pairRDD = sc.textFile(inputFile).map(x => (x, 1))
+
+    pairRDD.saveAsTextFile(textFileOutputDir)
+    pairRDD.saveAsNewAPIHadoopDataset(jobConf)
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index d7c1512..c55efe9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -34,6 +34,7 @@ import org.roaringbitmap.RoaringBitmap
 import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import org.apache.spark.serializer.KryoTest._
 import org.apache.spark.storage.BlockManagerId
@@ -358,6 +359,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     }
   }
 
+  test("registration of TaskCommitMessage") {
+    val conf = new SparkConf(false)
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
+
+    val ser = new KryoSerializer(conf).newInstance()
+    // In HadoopMapReduceCommitProtocol#commitTask
+    val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map()
+    addedAbsPathFiles.put("test1", "test1")
+    addedAbsPathFiles.put("test2", "test2")
+
+    val partitionPaths: mutable.Set[String] = mutable.Set()
+    partitionPaths.add("test3")
+
+    val taskCommitMessage1 = new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
+    val taskCommitMessage2 = new TaskCommitMessage(Map.empty -> Set.empty)
+    Seq(taskCommitMessage1, taskCommitMessage2).foreach { taskCommitMessage =>
+      val obj1 = ser.deserialize[TaskCommitMessage](ser.serialize(taskCommitMessage)).obj
+      val obj2 = taskCommitMessage.obj
+      assert(obj1 == obj2)
+    }
+  }
+
   test("serialization buffer overflow reporting") {
     import org.apache.spark.SparkException
     val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org