You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/07 06:01:15 UTC
[incubator-uniffle] branch master updated: [Issue-194][Feature] Support spark 3.2.0 (#201)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ec6dcc9b [Issue-194][Feature] Support spark 3.2.0 (#201)
ec6dcc9b is described below
commit ec6dcc9b04b85fcaadb3d4eb81135624583018f3
Author: Xianming Lei <31...@users.noreply.github.com>
AuthorDate: Wed Sep 7 14:01:11 2022 +0800
[Issue-194][Feature] Support spark 3.2.0 (#201)
### What changes were proposed in this pull request?
support spark 3.2.0
### Why are the changes needed?
After SPARK-36892, MapOutputTrackerWorker#getMapSizesByExecutorId has been modified to include method names and return parameter types, Java.lang.NoSuchMethodException will be thrown when the RssShuffleManager#getExpectedTasksByExecutorId method is called, The version of our production environment is 3.2.0.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
reused intergration-test
Co-authored-by: leixianming <le...@didiglobal.com>
---
.github/workflows/parallel.yml | 1 +
README.md | 12 ++-
.../apache/spark/shuffle/RssShuffleManager.java | 26 ++++++-
pom.xml | 89 ++++++++++++++++++++++
4 files changed, 122 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/parallel.yml b/.github/workflows/parallel.yml
index e684303d..b3ee8b80 100644
--- a/.github/workflows/parallel.yml
+++ b/.github/workflows/parallel.yml
@@ -61,6 +61,7 @@ jobs:
- spark3.0
- spark3
- spark3.2
+ - spark3.2.0
- mr
fail-fast: false
name: -P${{ matrix.profile }}
diff --git a/README.md b/README.md
index 214cfbc4..e3521acc 100644
--- a/README.md
+++ b/README.md
@@ -81,18 +81,26 @@ Build against profile Spark3(3.1.2)
mvn -DskipTests clean package -Pspark3
-Build against Spark 3.2.x
+Build against Spark 3.2.x, Except 3.2.0
mvn -DskipTests clean package -Pspark3.2
+Build against Spark 3.2.0
+
+ mvn -DskipTests clean package -Pspark3.2.0
+
To package the Uniffle, run:
./build_distribution.sh
-Package against Spark 3.2.x, run:
+Package against Spark 3.2.x, Except 3.2.0, run:
./build_distribution.sh --spark3-profile 'spark3.2'
+Package against Spark 3.2.0, run:
+
+ ./build_distribution.sh --spark3-profile 'spark3.2.0'
+
rss-xxx.tgz will be generated for deployment
## Deploy
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 0d0ca539..fbbef743 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.MapOutputTracker;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
@@ -458,7 +459,7 @@ public class RssShuffleManager implements ShuffleManager {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> mapStatusIter = null;
// Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
- // we use reflection and catch for the compatibility with 3.0 & 3.1
+ // we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
try {
// attempt to use Spark 3.1's API
mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
@@ -471,7 +472,7 @@ public class RssShuffleManager implements ShuffleManager {
endMapIndex,
startPartition,
endPartition);
- } catch (Exception e) {
+ } catch (Exception ignored) {
// fallback and attempt to use Spark 3.0's API
try {
mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
@@ -483,8 +484,25 @@ public class RssShuffleManager implements ShuffleManager {
shuffleId,
startPartition,
endPartition);
- } catch (Exception ee) {
- throw new RuntimeException(ee);
+ } catch (Exception ignored1) {
+ try {
+ // attempt to use Spark 3.2.0's API
+ // Each Spark release will be versioned: [MAJOR].[FEATURE].[MAINTENANCE].
+ // Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
+ // some interfaces were removed wrongly in Spark 3.2.0. And they were added by Spark 3.2.1.
+ // So we need to adapt Spark 3.2.0 here
+ mapStatusIter = (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
+ MapOutputTracker.class.getDeclaredMethod("getMapSizesByExecutorId",
+ int.class, int.class, int.class, int.class, int.class)
+ .invoke(SparkEnv.get().mapOutputTracker(),
+ shuffleId,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
while (mapStatusIter.hasNext()) {
diff --git a/pom.xml b/pom.xml
index 18011c27..d13c0651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1289,6 +1289,95 @@
</dependencyManagement>
</profile>
+ <profile>
+ <id>spark3.2.0</id>
+ <properties>
+ <scala.binary.version>2.12</scala.binary.version>
+ <spark.version>3.2.0</spark.version>
+ <client.type>3</client.type>
+ <jackson.version>2.12.0</jackson.version>
+ </properties>
+ <modules>
+ <module>client-spark/common</module>
+ <module>client-spark/spark3</module>
+ <module>integration-test/spark-common</module>
+ <module>integration-test/spark3</module>
+ </modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-integration-common-test</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-integration-spark-common-test</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
<profile>
<id>spark3.0</id>
<properties>