You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by ch...@apache.org on 2023/09/07 08:25:18 UTC

[incubator-celeborn] branch main updated: [CELEBORN-913] Implement method ShuffleDriverComponents#supportsReliableStorage

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e01aac50 [CELEBORN-913] Implement method ShuffleDriverComponents#supportsReliableStorage
9e01aac50 is described below

commit 9e01aac501862c445b72a91c188d1ecd704d501e
Author: zhouyifan279 <zh...@gmail.com>
AuthorDate: Thu Sep 7 16:25:09 2023 +0800

    [CELEBORN-913] Implement method ShuffleDriverComponents#supportsReliableStorage
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    See https://issues.apache.org/jira/browse/SPARK-42689
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. User need to set `spark.shuffle.sort.io.plugin.class` to `org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO` to enable this feature.
    
    ### How was this patch tested?
    Add a new matrix dimension, shuffle-plugin-class, in github ci, to run spark tests over `LocalDiskShuffleDataIO` and `CelebornShuffleDataIO` respectively.
    
    Closes #1884 from zhouyifan279/spark-driver-component.
    
    Authored-by: zhouyifan279 <zh...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .github/workflows/maven.yml                        |  5 +-
 .github/workflows/sbt.yml                          |  5 +-
 .../shuffle/celeborn/CelebornShuffleDataIO.java    | 68 ++++++++++++++++++++++
 pom.xml                                            |  4 ++
 project/CelebornBuild.scala                        |  5 ++
 5 files changed, 85 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 06aa5cb04..5a3c0acc1 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -95,6 +95,9 @@ jobs:
           - '3.3'
           - '3.4'
           - '3.5'
+        shuffle-plugin-class:
+          - 'org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO'
+          - 'org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO'
         exclude:
           # SPARK-33772: Spark supports JDK 17 since 3.3.0
           - java: 17
@@ -119,7 +122,7 @@ jobs:
         PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
         TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
         build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
-        build/mvn $PROFILES -pl $TEST_MODULES test
+        build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
     - name: Upload test log
       if: failure()
       uses: actions/upload-artifact@v3
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index f6271e63e..4803422bf 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -93,6 +93,9 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
+        shuffle-plugin-class:
+          - 'org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO'
+          - 'org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO'
         include:
           # Spark 3.0
           - spark: '3.0'
@@ -190,7 +193,7 @@ jobs:
         check-latest: false
     - name: Test with SBT
       run: |
-        build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; celeborn-spark-group/test"
+        build/sbt -Dspark.shuffle.plugin.class=${{ matrix.shuffle-plugin-class }} -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; celeborn-spark-group/test"
     - name: Upload test log
       if: failure()
       uses: actions/upload-artifact@v3
diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java
new file mode 100644
index 000000000..4e5681437
--- /dev/null
+++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.celeborn;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.api.ShuffleDataIO;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class CelebornShuffleDataIO implements ShuffleDataIO {
+
+  private static final Logger logger = LoggerFactory.getLogger(CelebornShuffleDataIO.class);
+
+  private final SparkConf sparkConf;
+  private final CelebornConf celebornConf;
+
+  public CelebornShuffleDataIO(SparkConf sparkConf) {
+    logger.info("Loading CelebornShuffleDataIO");
+    this.sparkConf = sparkConf;
+    this.celebornConf = SparkUtils.fromSparkConf(sparkConf);
+  }
+
+  @Override
+  public ShuffleExecutorComponents executor() {
+    // Used when fallback to Spark SortShuffleManager
+    return new LocalDiskShuffleExecutorComponents(sparkConf);
+  }
+
+  @Override
+  public ShuffleDriverComponents driver() {
+    return new CelebornShuffleDriverComponents(celebornConf);
+  }
+}
+
+class CelebornShuffleDriverComponents extends LocalDiskShuffleDriverComponents {
+
+  private final boolean supportsReliableStorage;
+
+  public CelebornShuffleDriverComponents(CelebornConf celebornConf) {
+    this.supportsReliableStorage = !celebornConf.shuffleForceFallbackEnabled();
+  }
+
+  // Omitting @Override annotation to avoid compile error before Spark 3.5.0
+  public boolean supportsReliableStorage() {
+    return supportsReliableStorage;
+  }
+}
diff --git a/pom.xml b/pom.xml
index cab6c1722..3af1d24d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,8 @@
       --add-opens=java.base/sun.security.action=ALL-UNNAMED
       --add-opens=java.base/sun.util.calendar=ALL-UNNAMED</extraJavaTestArgs>
 
+    <spark.shuffle.plugin.class>org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO</spark.shuffle.plugin.class>
+
     <mavenCentralId>central</mavenCentralId>
     <mavenCentralName>Maven Central</mavenCentralName>
     <mavenCentralUrl>https://repo.maven.apache.org/maven2</mavenCentralUrl>
@@ -569,6 +571,7 @@
               <log4j2.configurationFile>src/test/resources/log4j2-test.xml</log4j2.configurationFile>
               <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
               <spark.driver.memory>1g</spark.driver.memory>
+              <spark.shuffle.sort.io.plugin.class>${spark.shuffle.plugin.class}</spark.shuffle.sort.io.plugin.class>
             </systemProperties>
             <environmentVariables>
               <CELEBORN_LOCAL_HOSTNAME>localhost</CELEBORN_LOCAL_HOSTNAME>
@@ -606,6 +609,7 @@
               <log4j2.configurationFile>src/test/resources/log4j2-test.xml</log4j2.configurationFile>
               <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
               <spark.driver.memory>1g</spark.driver.memory>
+              <spark.shuffle.sort.io.plugin.class>${spark.shuffle.plugin.class}</spark.shuffle.sort.io.plugin.class>
             </systemProperties>
             <environmentVariables>
               <CELEBORN_LOCAL_HOSTNAME>localhost</CELEBORN_LOCAL_HOSTNAME>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 41afeba51..6aa6e872b 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -179,6 +179,11 @@ object CelebornCommonSettings {
       "-Xmx4g"
     ),
 
+    Test / javaOptions ++= Seq(
+      "-Dspark.shuffle.sort.io.plugin.class="
+        + sys.props.getOrElse("spark.shuffle.plugin.class", "org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO"),
+    ),
+
     Test / envVars += ("IS_TESTING", "1")
   )