You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/07 06:01:15 UTC

[incubator-uniffle] branch master updated: [Issue-194][Feature] Support spark 3.2.0 (#201)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6dcc9b [Issue-194][Feature] Support spark 3.2.0 (#201)
ec6dcc9b is described below

commit ec6dcc9b04b85fcaadb3d4eb81135624583018f3
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Wed Sep 7 14:01:11 2022 +0800

    [Issue-194][Feature] Support spark 3.2.0 (#201)
    
    ### What changes were proposed in this pull request?
    support spark 3.2.0
    
    ### Why are the changes needed?
    After SPARK-36892, MapOutputTrackerWorker#getMapSizesByExecutorId has been modified to include method names and return parameter types, Java.lang.NoSuchMethodException will be thrown when the RssShuffleManager#getExpectedTasksByExecutorId method is called, The version of our production environment is 3.2.0.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    reused intergration-test
    
    Co-authored-by: leixianming <le...@didiglobal.com>
---
 .github/workflows/parallel.yml                     |  1 +
 README.md                                          | 12 ++-
 .../apache/spark/shuffle/RssShuffleManager.java    | 26 ++++++-
 pom.xml                                            | 89 ++++++++++++++++++++++
 4 files changed, 122 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/parallel.yml b/.github/workflows/parallel.yml
index e684303d..b3ee8b80 100644
--- a/.github/workflows/parallel.yml
+++ b/.github/workflows/parallel.yml
@@ -61,6 +61,7 @@ jobs:
           - spark3.0
           - spark3
           - spark3.2
+          - spark3.2.0
           - mr
       fail-fast: false
     name: -P${{ matrix.profile }}
diff --git a/README.md b/README.md
index 214cfbc4..e3521acc 100644
--- a/README.md
+++ b/README.md
@@ -81,18 +81,26 @@ Build against profile Spark3(3.1.2)
 
     mvn -DskipTests clean package -Pspark3
 
-Build against Spark 3.2.x
+Build against Spark 3.2.x, Except 3.2.0
 
     mvn -DskipTests clean package -Pspark3.2
 
+Build against Spark 3.2.0
+
+    mvn -DskipTests clean package -Pspark3.2.0
+
 To package the Uniffle, run:
 
     ./build_distribution.sh
 
-Package against Spark 3.2.x, run:
+Package against Spark 3.2.x, Except 3.2.0, run:
 
     ./build_distribution.sh --spark3-profile 'spark3.2'
 
+Package against Spark 3.2.0, run:
+
+    ./build_distribution.sh --spark3-profile 'spark3.2.0'
+
 rss-xxx.tgz will be generated for deployment
 
 ## Deploy
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 0d0ca539..fbbef743 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.MapOutputTracker;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
@@ -458,7 +459,7 @@ public class RssShuffleManager implements ShuffleManager {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
     Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> mapStatusIter = null;
     // Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
-    // we use reflection and catch for the compatibility with 3.0 & 3.1
+    // we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
     try {
       // attempt to use Spark 3.1's API
       mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
@@ -471,7 +472,7 @@ public class RssShuffleManager implements ShuffleManager {
                   endMapIndex,
                   startPartition,
                   endPartition);
-    } catch (Exception e) {
+    } catch (Exception ignored) {
       // fallback and attempt to use Spark 3.0's API
       try {
         mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
@@ -483,8 +484,25 @@ public class RssShuffleManager implements ShuffleManager {
                     shuffleId,
                     startPartition,
                     endPartition);
-      } catch (Exception ee) {
-        throw new RuntimeException(ee);
+      } catch (Exception ignored1) {
+        try {
+          // attempt to use Spark 3.2.0's API
+          // Each Spark release will be versioned: [MAJOR].[FEATURE].[MAINTENANCE].
+          // Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
+          // some interfaces were removed wrongly in Spark 3.2.0. And they were added by Spark 3.2.1.
+          // So we need to adapt Spark 3.2.0 here
+          mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
+              MapOutputTracker.class.getDeclaredMethod("getMapSizesByExecutorId",
+                  int.class, int.class, int.class, int.class, int.class)
+                  .invoke(SparkEnv.get().mapOutputTracker(),
+                      shuffleId,
+                      startMapIndex,
+                      endMapIndex,
+                      startPartition,
+                      endPartition);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
       }
     }
     while (mapStatusIter.hasNext()) {
diff --git a/pom.xml b/pom.xml
index 18011c27..d13c0651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1289,6 +1289,95 @@
       </dependencyManagement>
     </profile>
 
+    <profile>
+      <id>spark3.2.0</id>
+      <properties>
+        <scala.binary.version>2.12</scala.binary.version>
+        <spark.version>3.2.0</spark.version>
+        <client.type>3</client.type>
+        <jackson.version>2.12.0</jackson.version>
+      </properties>
+      <modules>
+        <module>client-spark/common</module>
+        <module>client-spark/spark3</module>
+        <module>integration-test/spark-common</module>
+        <module>integration-test/spark3</module>
+      </modules>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark3</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-integration-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-integration-spark-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
     <profile>
       <id>spark3.0</id>
       <properties>