You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:36 UTC

[incubator-uniffle] 01/02: [Feature] [0.2] Support Spark 3.2 (#88)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 7f3c44a9a051310e991034162ef53e2835490e71
Author: roryqi <je...@gmail.com>
AuthorDate: Tue Mar 1 20:33:34 2022 +0800

    [Feature] [0.2] Support Spark 3.2 (#88)
    
    ### What changes were proposed in this pull request?
    Support Spark 3.2
    
    ### Why are the changes needed?
    We need support more Spark Versions
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA passed (include profiles spark2,spark3,spark3.0,spark3.1,spark3.2)
    
    Co-authored-by: roryqi <ro...@tencent.com>
---
 README.md                                          |   2 +-
 .../spark/shuffle/writer/WriteBufferManager.java   |   3 +-
 .../spark/shuffle/writer/RssShuffleWriter.java     |   5 +
 .../tencent/rss/test/SparkIntegrationTestBase.java |   4 +
 integration-test/spark3/pom.xml                    |   2 +
 pom.xml                                            | 106 ++++++++++++++++++++-
 6 files changed, 119 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index a785f47..ac3e92a 100644
--- a/README.md
+++ b/README.md
@@ -36,7 +36,7 @@ The shuffle data is stored with index file and data file. Data file has all bloc
 ![Rss Shuffle_Write](docs/asset/rss_data_format.png)
 
 ## Supported Spark Version
-Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x
+Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x
 
 Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark
 
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 1b26f0b..91cc6a7 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializationStream;
 import org.apache.spark.serializer.Serializer;
@@ -86,7 +87,7 @@ public class WriteBufferManager extends MemoryConsumer {
       Map<Integer, List<ShuffleServerInfo>> partitionToServers,
       TaskMemoryManager taskMemoryManager,
       ShuffleWriteMetrics shuffleWriteMetrics) {
-    super(taskMemoryManager);
+    super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
     this.bufferSize = bufferManagerOptions.getBufferSize();
     this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
     this.instance = serializer.newInstance();
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2a4beb6..a7e4480 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -171,6 +171,11 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         + bufferManager.getManagerCostInfo());
   }
 
+  // only push-based shuffle use this interface, but rss won't be used when push-based shuffle is enabled.
+  public long[] getPartitionLengths() {
+    return new long[0];
+  }
+
   private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList, Set<Long> blockIds) {
     if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
       shuffleBlockInfoList.forEach(sbi -> {
diff --git a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
index 06789d2..1e15ba6 100644
--- a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
+++ b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
@@ -21,6 +21,9 @@ package com.tencent.rss.test;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.spark.SparkConf;
 import org.apache.spark.shuffle.RssClientConfig;
 import org.apache.spark.sql.SparkSession;
@@ -50,6 +53,7 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase {
     Map resultWithoutRss = runSparkApp(sparkConf, fileName);
     long durationWithoutRss = System.currentTimeMillis() - start;
 
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
     updateSparkConfWithRss(sparkConf);
     updateSparkConfCustomer(sparkConf);
     start = System.currentTimeMillis();
diff --git a/integration-test/spark3/pom.xml b/integration-test/spark3/pom.xml
index d4d024a..5dc2f39 100644
--- a/integration-test/spark3/pom.xml
+++ b/integration-test/spark3/pom.xml
@@ -108,11 +108,13 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index 628c789..1cc35f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
     <maven.compiler.target>${java.version}</maven.compiler.target>
     <metrics.version>3.1.0</metrics.version>
     <mockito.inline.version>3.5.15</mockito.inline.version>
-    <netty.version>4.1.47.Final</netty.version>
+    <netty.version>4.1.68.Final</netty.version>
     <picocli.version>4.5.2</picocli.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version>
@@ -1010,6 +1010,7 @@
         <spark.version>2.3.4</spark.version>
         <client.type>2</client.type>
         <jackson.version>2.9.0</jackson.version>
+        <netty.version>4.1.47.Final</netty.version>
       </properties>
       <modules>
         <module>client-spark/common</module>
@@ -1018,6 +1019,11 @@
       </modules>
       <dependencyManagement>
         <dependencies>
+          <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+          </dependency>
           <dependency>
             <groupId>com.tencent.rss</groupId>
             <artifactId>rss-client-spark2</artifactId>
@@ -1094,6 +1100,104 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-integration-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-integration-spark-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
+    <profile>
+      <id>spark3.2</id>
+      <properties>
+        <scala.binary.version>2.12</scala.binary.version>
+        <spark.version>3.2.1</spark.version>
+        <client.type>3</client.type>
+        <jackson.version>2.12.0</jackson.version>
+      </properties>
+      <modules>
+        <module>client-spark/common</module>
+        <module>client-spark/spark3</module>
+        <module>integration-test/spark-common</module>
+        <module>integration-test/spark3</module>
+      </modules>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark3</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
           </dependency>
           <dependency>
             <groupId>com.tencent.rss</groupId>