You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/21 03:37:31 UTC

[incubator-uniffle] branch master updated: [ISSUE-342][Improvement] Check Spark Serializer type (#344)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 110bb942 [ISSUE-342][Improvement] Check Spark Serializer type (#344)
110bb942 is described below

commit 110bb942fad885562a6bcf29b4b92eccd16ca616
Author: achong <12...@qq.com>
AuthorDate: Mon Nov 21 11:37:27 2022 +0800

    [ISSUE-342][Improvement] Check Spark Serializer type (#344)
    
    ### What changes were proposed in this pull request?
    Spark have multiple serializers. We support the spark serializer which supportsRelocationOfSerializedObjects.
    You can see https://github.com/apache/spark/blob/25849684b78cca6651e25d6efc9644a576e7e20f/core/src/main/scala/org/apache/spark/serializer/Serializer.scala#L98
    
    Spark have three kinds of serializer
    org.apache.spark.serializer.JavaSerializer
    org.apache.spark.sql.execution.UnsafeRowSerializer
    org.apache.spark.serializer.KryoSerializer
    Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
    
    
    ### Why are the changes needed?
    So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception.
---
 .../java/org/apache/spark/shuffle/RssShuffleManager.java    | 13 +++++++++++++
 .../java/org/apache/spark/shuffle/RssShuffleManager.java    | 12 ++++++++++++
 .../test/java/org/apache/uniffle/test/GetReaderTest.java    |  1 +
 .../test/java/org/apache/uniffle/test/GetReaderTest.java    |  1 +
 4 files changed, 27 insertions(+)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index ecdbdaac..c045f399 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -203,6 +203,19 @@ public class RssShuffleManager implements ShuffleManager {
   // pass that ShuffleHandle to executors (getWriter/getReader).
   @Override
   public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
+
+    //Spark have three kinds of serializer:
+    //org.apache.spark.serializer.JavaSerializer
+    //org.apache.spark.sql.execution.UnsafeRowSerializer
+    //org.apache.spark.serializer.KryoSerializer,
+    //Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
+    //So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception
+    if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
+      throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the"
+              + " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object "
+              + "relocation.");
+    }
+
     // If yarn enable retry ApplicationMaster, appId will be not unique and shuffle data will be incorrect,
     // appId + timestamp can avoid such problem,
     // can't get appId in construct because SparkEnv is not created yet,
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 36ecbc8c..1691dfe4 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -265,6 +265,18 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<K, V, C> dependency) {
 
+    //Spark have three kinds of serializer:
+    //org.apache.spark.serializer.JavaSerializer
+    //org.apache.spark.sql.execution.UnsafeRowSerializer
+    //org.apache.spark.serializer.KryoSerializer,
+    //Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
+    //So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should throw an exception
+    if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
+      throw new IllegalArgumentException("Can't use serialized shuffle for shuffleId: " + shuffleId + ", because the"
+              + " serializer: " + SparkEnv.get().serializer().getClass().getName() + " does not support object "
+              + "relocation.");
+    }
+
     if (id.get() == null) {
       id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + System.currentTimeMillis());
     }
diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index b83c6279..50f47cb5 100644
--- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -55,6 +55,7 @@ public class GetReaderTest extends IntegrationTestBase {
   public void test() throws Exception {
     SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager");
+    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
     sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM);
     sparkConf.setMaster("local[4]");
     final String remoteStorage1 = "hdfs://h1/p1";
diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 0d1271d8..814566c4 100644
--- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -68,6 +68,7 @@ public class GetReaderTest extends IntegrationTestBase {
   public void test() throws Exception {
     SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager");
+    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
     sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM);
     sparkConf.setMaster("local[4]");
     final String remoteStorage1 = "hdfs://h1/p1";