You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:35 UTC

[incubator-uniffle] branch branch-0.2.0 created (now 75b5376)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a change to branch branch-0.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


      at 75b5376  [Bugfix] Fix uncorrect index file (#92)

This branch includes the following new commits:

     new 7f3c44a  [Feature] [0.2] Support Spark 3.2 (#88)
     new 75b5376  [Bugfix] Fix uncorrect index file (#92)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-uniffle] 02/02: [Bugfix] Fix uncorrect index file (#92)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 75b537661f1a29291f199974c6e7fa1e39197d72
Author: roryqi <je...@gmail.com>
AuthorDate: Tue Mar 8 16:31:33 2022 +0800

    [Bugfix] Fix uncorrect index file (#92)
    
    ### What changes were proposed in this pull request?
    Modify the method that calculate the offset in the index file.
    
    ### Why are the changes needed?
    If we don't have this patch, we run 10TB tpcds, query24a will fail.
    <img width="361" alt="企业微信截图_6dc451cf-dbf4-4257-b680-e79346cd582d" src="https://user-images.githubusercontent.com/8159038/157178756-d8a39b3f-0ea6-4864-ac68-ee382a88bb0f.png">
    When we write many data to dataOutputStream, dataOutputStream.size() won't increase again. dataOutputStream.size() will
    always be Integer.MAX_VALUE.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new uts.
    
    Co-authored-by: roryqi <ro...@tencent.com>
---
 .../rss/storage/handler/impl/LocalFileWriter.java       |  6 ++----
 .../rss/storage/handler/impl/LocalFileHandlerTest.java  | 17 +++++++++++++++++
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/storage/src/main/java/com/tencent/rss/storage/handler/impl/LocalFileWriter.java b/storage/src/main/java/com/tencent/rss/storage/handler/impl/LocalFileWriter.java
index 10185a4..609db7e 100644
--- a/storage/src/main/java/com/tencent/rss/storage/handler/impl/LocalFileWriter.java
+++ b/storage/src/main/java/com/tencent/rss/storage/handler/impl/LocalFileWriter.java
@@ -30,21 +30,19 @@ public class LocalFileWriter implements Closeable {
 
   private DataOutputStream dataOutputStream;
   private FileOutputStream fileOutputStream;
-  private long initSize;
   private long nextOffset;
 
   public LocalFileWriter(File file) throws IOException {
     fileOutputStream = new FileOutputStream(file, true);
     // init fsDataOutputStream
     dataOutputStream = new DataOutputStream(fileOutputStream);
-    initSize = file.length();
-    nextOffset = initSize;
+    nextOffset = file.length();
   }
 
   public void writeData(byte[] data) throws IOException {
     if (data != null && data.length > 0) {
       dataOutputStream.write(data);
-      nextOffset = initSize + dataOutputStream.size();
+      nextOffset = nextOffset + data.length;
     }
   }
 
diff --git a/storage/src/test/java/com/tencent/rss/storage/handler/impl/LocalFileHandlerTest.java b/storage/src/test/java/com/tencent/rss/storage/handler/impl/LocalFileHandlerTest.java
index 969944d..ce8915b 100644
--- a/storage/src/test/java/com/tencent/rss/storage/handler/impl/LocalFileHandlerTest.java
+++ b/storage/src/test/java/com/tencent/rss/storage/handler/impl/LocalFileHandlerTest.java
@@ -39,6 +39,7 @@ import com.tencent.rss.storage.handler.api.ServerReadHandler;
 import com.tencent.rss.storage.handler.api.ShuffleWriteHandler;
 import com.tencent.rss.storage.util.ShuffleStorageUtils;
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -53,6 +54,7 @@ public class LocalFileHandlerTest {
   @Test
   public void writeTest() throws Exception {
     File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
     File dataDir1 = new File(tmpDir, "data1");
     File dataDir2 = new File(tmpDir, "data2");
     String[] basePaths = new String[]{dataDir1.getAbsolutePath(),
@@ -111,6 +113,21 @@ public class LocalFileHandlerTest {
     }
   }
 
+  @Test
+  public void writeBigDataTest() throws IOException  {
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    File writeFile = new File(tmpDir, "writetest");
+    LocalFileWriter writer = new LocalFileWriter(writeFile);
+    int  size = Integer.MAX_VALUE / 100;
+    byte[] data = new byte[size];
+    for (int i = 0; i < 200; i++) {
+      writer.writeData(data);
+    }
+    long totalSize = 200L * size;
+    assertEquals(writer.nextOffset(), totalSize);
+  }
+
 
   private void writeTestData(
       ShuffleWriteHandler writeHandler,


[incubator-uniffle] 01/02: [Feature] [0.2] Support Spark 3.2 (#88)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 7f3c44a9a051310e991034162ef53e2835490e71
Author: roryqi <je...@gmail.com>
AuthorDate: Tue Mar 1 20:33:34 2022 +0800

    [Feature] [0.2] Support Spark 3.2 (#88)
    
    ### What changes were proposed in this pull request?
    Support Spark 3.2
    
    ### Why are the changes needed?
    We need support more Spark Versions
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA passed (include profiles spark2,spark3,spark3.0,spark3.1,spark3.2)
    
    Co-authored-by: roryqi <ro...@tencent.com>
---
 README.md                                          |   2 +-
 .../spark/shuffle/writer/WriteBufferManager.java   |   3 +-
 .../spark/shuffle/writer/RssShuffleWriter.java     |   5 +
 .../tencent/rss/test/SparkIntegrationTestBase.java |   4 +
 integration-test/spark3/pom.xml                    |   2 +
 pom.xml                                            | 106 ++++++++++++++++++++-
 6 files changed, 119 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index a785f47..ac3e92a 100644
--- a/README.md
+++ b/README.md
@@ -36,7 +36,7 @@ The shuffle data is stored with index file and data file. Data file has all bloc
 ![Rss Shuffle_Write](docs/asset/rss_data_format.png)
 
 ## Supported Spark Version
-Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x
+Current support Spark 2.3.x, Spark 2.4.x, Spark3.0.x, Spark 3.1.x, Spark 3.2.x
 
 Note: To support dynamic allocation, the patch(which is included in client-spark/patch folder) should be applied to Spark
 
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 1b26f0b..91cc6a7 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializationStream;
 import org.apache.spark.serializer.Serializer;
@@ -86,7 +87,7 @@ public class WriteBufferManager extends MemoryConsumer {
       Map<Integer, List<ShuffleServerInfo>> partitionToServers,
       TaskMemoryManager taskMemoryManager,
       ShuffleWriteMetrics shuffleWriteMetrics) {
-    super(taskMemoryManager);
+    super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
     this.bufferSize = bufferManagerOptions.getBufferSize();
     this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
     this.instance = serializer.newInstance();
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2a4beb6..a7e4480 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -171,6 +171,11 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
         + bufferManager.getManagerCostInfo());
   }
 
+  // only push-based shuffle use this interface, but rss won't be used when push-based shuffle is enabled.
+  public long[] getPartitionLengths() {
+    return new long[0];
+  }
+
   private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList, Set<Long> blockIds) {
     if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
       shuffleBlockInfoList.forEach(sbi -> {
diff --git a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
index 06789d2..1e15ba6 100644
--- a/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
+++ b/integration-test/spark-common/src/test/java/com/tencent/rss/test/SparkIntegrationTestBase.java
@@ -21,6 +21,9 @@ package com.tencent.rss.test;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.spark.SparkConf;
 import org.apache.spark.shuffle.RssClientConfig;
 import org.apache.spark.sql.SparkSession;
@@ -50,6 +53,7 @@ public abstract class SparkIntegrationTestBase extends IntegrationTestBase {
     Map resultWithoutRss = runSparkApp(sparkConf, fileName);
     long durationWithoutRss = System.currentTimeMillis() - start;
 
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
     updateSparkConfWithRss(sparkConf);
     updateSparkConfCustomer(sparkConf);
     start = System.currentTimeMillis();
diff --git a/integration-test/spark3/pom.xml b/integration-test/spark3/pom.xml
index d4d024a..5dc2f39 100644
--- a/integration-test/spark3/pom.xml
+++ b/integration-test/spark3/pom.xml
@@ -108,11 +108,13 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index 628c789..1cc35f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
     <maven.compiler.target>${java.version}</maven.compiler.target>
     <metrics.version>3.1.0</metrics.version>
     <mockito.inline.version>3.5.15</mockito.inline.version>
-    <netty.version>4.1.47.Final</netty.version>
+    <netty.version>4.1.68.Final</netty.version>
     <picocli.version>4.5.2</picocli.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version>
@@ -1010,6 +1010,7 @@
         <spark.version>2.3.4</spark.version>
         <client.type>2</client.type>
         <jackson.version>2.9.0</jackson.version>
+        <netty.version>4.1.47.Final</netty.version>
       </properties>
       <modules>
         <module>client-spark/common</module>
@@ -1018,6 +1019,11 @@
       </modules>
       <dependencyManagement>
         <dependencies>
+          <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+          </dependency>
           <dependency>
             <groupId>com.tencent.rss</groupId>
             <artifactId>rss-client-spark2</artifactId>
@@ -1094,6 +1100,104 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-integration-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-integration-spark-common-test</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
+    <profile>
+      <id>spark3.2</id>
+      <properties>
+        <scala.binary.version>2.12</scala.binary.version>
+        <spark.version>3.2.1</spark.version>
+        <client.type>3</client.type>
+        <jackson.version>2.12.0</jackson.version>
+      </properties>
+      <modules>
+        <module>client-spark/common</module>
+        <module>client-spark/spark3</module>
+        <module>integration-test/spark-common</module>
+        <module>integration-test/spark3</module>
+      </modules>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>com.tencent.rss</groupId>
+            <artifactId>rss-client-spark3</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>*</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+            </exclusions>
           </dependency>
           <dependency>
             <groupId>com.tencent.rss</groupId>