You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:36 UTC
[incubator-uniffle] 01/02: [Feature] [0.2] Support Spark 3.2 (#88)
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 7f3c44a9a051310e991034162ef53e2835490e71
Author: roryqi <je...@gmail.com>
AuthorDate: Tue Mar 1 20:33:34 2022 +0800
[Feature] [0.2] Support Spark 3.2 (#88)
### What changes were proposed in this pull request?
Support Spark 3.2
### Why are the changes needed?
We need support more Spark Versions
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA passed (include profiles spark2,spark3,spark3.0,spark3.1,spark3.2)
Co-authored-by: roryqi <ro...@tencent.com>
---
README.md | 2 +-
.../spark/shuffle/writer/WriteBufferManager.java | 3 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 5 +
.../tencent/rss/test/SparkIntegrationTestBase.java | 4 +
integration-test/spark3/pom.xml | 2 +
pom.xml | 106 ++++++++++++++++++++-
6 files changed, 119 insertions(+), 3 deletions(-)
diff --git a/README.md b/README.md
index a785f47..ac3e92a 100644
--- a/README.md
+++ b/README.md
@@ -36,7 +36,7 @@ The shuffle data is stored with index file and data file. Data file has all bloc
![Rss Shuffle_Write](docs/asset/rss_data_format.png)
## Supported Spark Version
-Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x
+Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x
Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 1b26f0b..91cc6a7 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.Serializer;
@@ -86,7 +87,7 @@ public class WriteBufferManager extends MemoryConsumer {
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
TaskMemoryManager taskMemoryManager,
ShuffleWriteMetrics shuffleWriteMetrics) {
- super(taskMemoryManager);
+ super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
this.bufferSize = bufferManagerOptions.getBufferSize();
this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
this.instance = serializer.newInstance();
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2a4beb6..a7e4480 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -171,6 +171,11 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
+ bufferManager.getManagerCostInfo());
}
+ // only push-based shuffle use this interface, but rss won't be used when push-based shuffle is enabled.
+ public long[] getPartitionLengths() {
+ return new long[0];
+ }
+
private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList, Set<Long> blockIds) {
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
shuffleBlockInfoList.forEach(sbi -> {
diff --git a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
index 06789d2..1e15ba6 100644
--- a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
+++ b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
@@ -21,6 +21,9 @@ package com.tencent.rss.test;
import static org.junit.Assert.assertEquals;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssClientConfig;
import org.apache.spark.sql.SparkSession;
@@ -50,6 +53,7 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase {
Map resultWithoutRss = runSparkApp(sparkConf, fileName);
long durationWithoutRss = System.currentTimeMillis() - start;
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
updateSparkConfWithRss(sparkConf);
updateSparkConfCustomer(sparkConf);
start = System.currentTimeMillis();
diff --git a/integration-test/spark3/pom.xml b/integration-test/spark3/pom.xml
index d4d024a..5dc2f39 100644
--- a/integration-test/spark3/pom.xml
+++ b/integration-test/spark3/pom.xml
@@ -108,11 +108,13 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
<scope>test</scope>
</dependency>
diff --git a/pom.xml b/pom.xml
index 628c789..1cc35f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<metrics.version>3.1.0</metrics.version>
<mockito.inline.version>3.5.15</mockito.inline.version>
- <netty.version>4.1.47.Final</netty.version>
+ <netty.version>4.1.68.Final</netty.version>
<picocli.version>4.5.2</picocli.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version>
@@ -1010,6 +1010,7 @@
<spark.version>2.3.4</spark.version>
<client.type>2</client.type>
<jackson.version>2.9.0</jackson.version>
+ <netty.version>4.1.47.Final</netty.version>
</properties>
<modules>
<module>client-spark/common</module>
@@ -1018,6 +1019,11 @@
</modules>
<dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
<dependency>
<groupId>com.tencent.rss</groupId>
<artifactId>rss-client-spark2</artifactId>
@@ -1094,6 +1100,104 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.tencent.rss</groupId>
+ <artifactId>rss-client-spark-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.tencent.rss</groupId>
+ <artifactId>rss-client-spark-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.tencent.rss</groupId>
+ <artifactId>rss-integration-common-test</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.tencent.rss</groupId>
+ <artifactId>rss-integration-spark-common-test</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
+ <profile>
+ <id>spark3.2</id>
+ <properties>
+ <scala.binary.version>2.12</scala.binary.version>
+ <spark.version>3.2.1</spark.version>
+ <client.type>3</client.type>
+ <jackson.version>2.12.0</jackson.version>
+ </properties>
+ <modules>
+ <module>client-spark/common</module>
+ <module>client-spark/spark3</module>
+ <module>integration-test/spark-common</module>
+ <module>integration-test/spark3</module>
+ </modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.tencent.rss</groupId>
+ <artifactId>rss-client-spark3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.tencent.rss</groupId>