You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:44 UTC

[incubator-uniffle] branch master created (now 166f3f8)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


      at 166f3f8  [Improvement] Modify configuration template (#209)

This branch includes the following new commits:

     new 46b62b2  [Improvement] Avoid using the default forkjoin pool by parallelStream directly (#180)
     new 7fa8b52  [Bugfix] Fix spark2 executor stop NPE problem (#187)
     new 924dac7  [Bugfix] Fix spark2 executor stop NPE problem (#186)
     new 11a8594  [Doc] Update readme with features like multiple remote storage support etc (#191)
     new 8d8e6bf  upgrade to 0.6.0-snapshot (#190)
     new cf731f2  [Bugfix] Fix MR don't have remote storage information when we use dynamic conf and MEMORY_LOCALE_HDFS storageType (#195)
     new d92208d  [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)
     new 6bdf49e  [Improvement] Check ADAPTIVE_EXECUTION_ENABLED in RssShuffleManager (#197)
     new a253b1f  [Improvement] Add dynamic allocation patch for Spark 3.2 (#199)
     new 8b5f363  [MINOR] Close clusterManager resources (#202)
     new 392c881  Support build_distribution.sh to specify different mvn build options for Spark2 and Spark3 (#203)
     new 2c1c554  [Improvement] Move detailed client configuration to individual doc (#201)
     new 6937631  [Improvement] Add RSS_IP environment variable support for K8S (#204)
     new 15a6ea6  [Improvement] Close coordinatorClients when DelegationRssShuffleManager stops (#205)
     new ba47aa0  [Minor] Make clearResourceThread and processEventThread daemon (#207)
     new 5ec04b8  Support using remote fs path to specify the excludeNodesFilePath (#200)
     new 166f3f8  [Improvement] Modify configuration template (#209)

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-uniffle] 16/17: Support using remote fs path to specify the excludeNodesFilePath (#200)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 5ec04b89348ca9c28c9ddce571ffa528969d2f8a
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Thu Jun 30 19:12:36 2022 +0800

    Support using remote fs path to specify the excludeNodesFilePath (#200)
    
    What changes were proposed in this pull request?
    Support using remote fs path to specify the excludeNodesFilePath
    
    Why are the changes needed?
    When existing two coordinators serving for online, we hope they can read the consistent exclude nodes file insteading of using the localfile syncing manually.
    
    Does this PR introduce any user-facing change?
    Yes. It's an incompatible change.
    
    When the default fs is HDFS in the core-site.xml, and the excludeFilePath is specified to "/user/xxxxx" in coordinator server.
    After applied this patch, filesystem will be initialized to remote HDFS due to lacking scheme.
    
    How was this patch tested?
    Unit tests.
---
 .../rss/coordinator/ClusterManagerFactory.java     | 10 +++-
 .../tencent/rss/coordinator/CoordinatorServer.java |  2 +-
 .../rss/coordinator/SimpleClusterManager.java      | 68 +++++++++++++---------
 .../coordinator/BasicAssignmentStrategyTest.java   |  6 +-
 .../PartitionBalanceAssignmentStrategyTest.java    |  6 +-
 .../rss/coordinator/SimpleClusterManagerTest.java  | 13 +++--
 6 files changed, 63 insertions(+), 42 deletions(-)

diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
index 2ec2b12..b2723f9 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
@@ -18,15 +18,19 @@
 
 package com.tencent.rss.coordinator;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class ClusterManagerFactory {
 
   CoordinatorConf conf;
+  Configuration hadoopConf;
 
-  public ClusterManagerFactory(CoordinatorConf conf) {
+  public ClusterManagerFactory(CoordinatorConf conf, Configuration hadoopConf) {
     this.conf = conf;
+    this.hadoopConf = hadoopConf;
   }
 
-  public ClusterManager getClusterManager() {
-    return new SimpleClusterManager(conf);
+  public ClusterManager getClusterManager() throws Exception {
+    return new SimpleClusterManager(conf, hadoopConf);
   }
 }
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
index 3b79221..2dbe06f 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
@@ -111,7 +111,7 @@ public class CoordinatorServer {
     registerMetrics();
     this.applicationManager = new ApplicationManager(coordinatorConf);
 
-    ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf);
+    ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf, new Configuration());
     this.clusterManager = clusterManagerFactory.getClusterManager();
     this.clientConfManager = new ClientConfManager(coordinatorConf, new Configuration(), applicationManager);
     AssignmentStrategyFactory assignmentStrategyFactory =
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index fcfd1dc..972ea5f 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -19,9 +19,10 @@
 package com.tencent.rss.coordinator;
 
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,10 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +57,9 @@ public class SimpleClusterManager implements ClusterManager {
   private int shuffleNodesMax;
   private ScheduledExecutorService scheduledExecutorService;
   private ScheduledExecutorService checkNodesExecutorService;
+  private FileSystem hadoopFileSystem;
 
-  public SimpleClusterManager(CoordinatorConf conf) {
+  public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws IOException {
     this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
@@ -65,6 +71,7 @@ public class SimpleClusterManager implements ClusterManager {
 
     String excludeNodesPath = conf.getString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, "");
     if (!StringUtils.isEmpty(excludeNodesPath)) {
+      this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new Path(excludeNodesPath), hadoopConf);
       long updateNodesInterval = conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL);
       checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor(
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateExcludeNodes-%d").build());
@@ -100,39 +107,37 @@ public class SimpleClusterManager implements ClusterManager {
 
   private void updateExcludeNodes(String path) {
     try {
-      File excludeNodesFile = new File(path);
-      if (excludeNodesFile.exists()) {
-        // don't parse same file twice
-        if (excludeLastModify.get() != excludeNodesFile.lastModified()) {
-          parseExcludeNodesFile(excludeNodesFile);
+      Path hadoopPath = new Path(path);
+      FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+      if (fileStatus != null && fileStatus.isFile()) {
+        if (excludeLastModify.get() != fileStatus.getModificationTime()) {
+          parseExcludeNodesFile(hadoopFileSystem.open(hadoopPath));
+          excludeLastModify.set(fileStatus.getModificationTime());
         }
       } else {
         excludeNodes = Sets.newConcurrentHashSet();
       }
       CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size());
+    } catch (FileNotFoundException fileNotFoundException) {
+      excludeNodes = Sets.newConcurrentHashSet();
     } catch (Exception e) {
-      LOG.warn("Error when update exclude nodes", e);
+      LOG.warn("Error when updating exclude nodes, the exclude nodes file path: " + path, e);
     }
   }
 
-  private void parseExcludeNodesFile(File excludeNodesFile) {
-    try {
-      Set<String> nodes = Sets.newConcurrentHashSet();
-      try (BufferedReader br = new BufferedReader(new FileReader(excludeNodesFile))) {
-        String line;
-        while ((line = br.readLine()) != null) {
-          if (!StringUtils.isEmpty(line)) {
-            nodes.add(line);
-          }
+  private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws IOException {
+    Set<String> nodes = Sets.newConcurrentHashSet();
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (!StringUtils.isEmpty(line)) {
+          nodes.add(line);
         }
       }
-      // update exclude nodes and last modify time
-      excludeNodes = nodes;
-      excludeLastModify.set(excludeNodesFile.lastModified());
-      LOG.info("Update exclude nodes and " + excludeNodes.size() + " nodes was marked as exclude nodes");
-    } catch (Exception e) {
-      LOG.warn("Error when parse file " + excludeNodesFile.getAbsolutePath(), e);
     }
+    // update exclude nodes and last modify time
+    excludeNodes = nodes;
+    LOG.info("Updated exclude nodes and " + excludeNodes.size() + " nodes were marked as exclude nodes");
   }
 
   @Override
@@ -186,18 +191,23 @@ public class SimpleClusterManager implements ClusterManager {
     servers.clear();
   }
 
+  @Override
+  public int getShuffleNodesMax() {
+    return shuffleNodesMax;
+  }
+
   @Override
   public void close() throws IOException {
+    if (hadoopFileSystem != null) {
+      hadoopFileSystem.close();
+    }
+
     if (scheduledExecutorService != null) {
       scheduledExecutorService.shutdown();
     }
+
     if (checkNodesExecutorService != null) {
       checkNodesExecutorService.shutdown();
     }
   }
-
-  @Override
-  public int getShuffleNodesMax() {
-    return shuffleNodesMax;
-  }
 }
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
index 7a95d76..c316c62 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,10 +45,10 @@ public class BasicAssignmentStrategyTest {
   private int shuffleNodesMax = 7;
 
   @BeforeEach
-  public void setUp() {
+  public void setUp() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc);
+    clusterManager = new SimpleClusterManager(ssc, new Configuration());
     strategy = new BasicAssignmentStrategy(clusterManager);
   }
 
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
index 9ca4146..196f6dd 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,10 +45,10 @@ public class PartitionBalanceAssignmentStrategyTest {
   private Set<String> tags = Sets.newHashSet("test");
 
   @BeforeEach
-  public void setUp() {
+  public void setUp() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc);
+    clusterManager = new SimpleClusterManager(ssc, new Configuration());
     strategy = new PartitionBalanceAssignmentStrategy(clusterManager);
   }
 
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index fc90d9e..cfa875d 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -22,11 +22,14 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -53,7 +56,7 @@ public class SimpleClusterManagerTest {
   public void getServerListTest() throws IOException {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
         10, testTags, true);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
@@ -108,7 +111,7 @@ public class SimpleClusterManagerTest {
   public void heartbeatTimeoutTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
     Thread t = new Thread(() -> {
       for (int i = 0; i < 3; i++) {
         if (i == 2) {
@@ -152,7 +155,7 @@ public class SimpleClusterManagerTest {
   public void testGetCorrectServerNodesWhenOneNodeRemoved() throws IOException {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
             10, testTags, true);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
@@ -181,13 +184,13 @@ public class SimpleClusterManagerTest {
     String excludeNodesFolder = (new File(ClassLoader.getSystemResource("empty").getFile())).getParent();
     String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
     CoordinatorConf ssc = new CoordinatorConf();
-    ssc.setString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, excludeNodesPath);
+    ssc.setString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, URI.create(excludeNodesPath).toString());
     ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 2000);
 
     Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
     writeExcludeHosts(excludeNodesPath, nodes);
 
-    SimpleClusterManager scm = new SimpleClusterManager(ssc);
+    SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration());
     scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
         10, testTags, true));
     scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,


[incubator-uniffle] 04/17: [Doc] Update readme with features like multiple remote storage support etc (#191)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 11a8594e868db3aaf55af9baa1903e8cbd17413e
Author: Colin <co...@tencent.com>
AuthorDate: Wed Jun 22 16:38:27 2022 +0800

    [Doc] Update readme with features like multiple remote storage support etc (#191)
    
    What changes were proposed in this pull request?
    Update Readme for latest features, eg, multiple remote storage support, dynamic client conf etc.
    
    Why are the changes needed?
    Doc should be updated
    
    Does this PR introduce any user-facing change?
    No
    
    How was this patch tested?
    No need
---
 README.md | 46 ++++++++++++++++++++++++++++++++++------------
 1 file changed, 34 insertions(+), 12 deletions(-)

diff --git a/README.md b/README.md
index e134f0f..50903ce 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@ Coordinator will collect status of shuffle server and do the assignment for the
 
 Shuffle server will receive the shuffle data, merge them and write to storage.
 
-Depend on different situation, Firestorm supports Memory & Local, Memory & Remote Storage(eg, HDFS), Local only, Remote Storage only.
+Depend on different situation, Firestorm supports Memory & Local, Memory & Remote Storage(eg, HDFS), Memory & Local & Remote Storage(recommendation for production environment).
 
 ## Shuffle Process with Firestorm
 
@@ -74,9 +74,25 @@ rss-xxx.tgz will be generated for deployment
      rss.coordinator.server.heartbeat.timeout 30000
      rss.coordinator.app.expired 60000
      rss.coordinator.shuffle.nodes.max 5
-     rss.coordinator.exclude.nodes.file.path RSS_HOME/conf/exclude_nodes
-   ```
-4. start Coordinator
+     # enable dynamicClientConf, and coordinator will be responsible for most of client conf
+     rss.coordinator.dynamicClientConf.enabled true
+     # config the path of client conf
+     rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf
+     # config the path of excluded shuffle server
+     rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes
+   ```
+4. update <RSS_HOME>/conf/dynamic_client.conf, rss client will get default conf from coordinator eg,
+   ```
+    # MEMORY_LOCALFILE_HDFS is recommandation for production environment
+    rss.storage.type MEMORY_LOCALFILE_HDFS
+    # multiple remote storages are supported, and client will get assignment from coordinator
+    rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path
+    rss.writer.require.memory.retryMax 1200
+    rss.client.retry.max 100
+    rss.writer.send.check.timeout 600000
+    rss.client.read.buffer.size 14m
+   ```
+5. start Coordinator
    ```
     bash RSS_HOME/bin/start-coordnator.sh
    ```
@@ -90,14 +106,17 @@ rss-xxx.tgz will be generated for deployment
      HADOOP_HOME=<hadoop home>
      XMX_SIZE="80g"
    ```
-3. update RSS_HOME/conf/server.conf, the following demo is for memory + local storage only, eg,
+3. update RSS_HOME/conf/server.conf, eg,
    ```
      rss.rpc.server.port 19999
      rss.jetty.http.port 19998
      rss.rpc.executor.size 2000
-     rss.storage.type MEMORY_LOCALFILE
+     # it should be configed the same as in coordinator
+     rss.storage.type MEMORY_LOCALFILE_HDFS
      rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
+     # local storage path for shuffle server
      rss.storage.basePath /data1/rssdata,/data2/rssdata....
+     # it's better to config thread num according to local disk num
      rss.server.flush.thread.alive 5
      rss.server.flush.threadPool.size 10
      rss.server.buffer.capacity 40g
@@ -108,6 +127,10 @@ rss-xxx.tgz will be generated for deployment
      rss.server.preAllocation.expired 120000
      rss.server.commit.timeout 600000
      rss.server.app.expired.withoutHeartbeat 120000
+     # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m
+     # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS
+     # please set proper value if DFS is used, eg, 64m, 128m.
+     rss.server.flush.cold.storage.threshold.size 100g
    ```
 4. start Shuffle Server
    ```
@@ -121,12 +144,11 @@ rss-xxx.tgz will be generated for deployment
 
    The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar
 
-2. Update Spark conf to enable Firestorm, the following demo is for local storage only, eg,
+2. Update Spark conf to enable Firestorm, eg,
 
    ```
    spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
    spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
-   spark.rss.storage.type MEMORY_LOCALFILE
    ```
 
 ### Support Spark dynamic allocation
@@ -140,17 +162,16 @@ After apply the patch and rebuild spark, add following configuration in spark co
   spark.dynamicAllocation.enabled true
   ```
 
-## Deploy MapReduce Client
+### Deploy MapReduce Client
 
 1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/
 
 The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
 
-2. Update MapReduce conf to enable Firestorm, the following demo is for local storage only, eg,
+2. Update MapReduce conf to enable Firestorm, eg,
 
    ```
    -Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
-   -Dmapreduce.rss.storage.type=MEMORY_LOCALFILE 
    -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
    -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
    -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
@@ -168,9 +189,10 @@ The important configuration is listed as following.
 |Property Name|Default|	Description|
 |---|---|---|
 |rss.coordinator.server.heartbeat.timeout|30000|Timeout if can't get heartbeat from shuffle server|
-|rss.coordinator.assignment.strategy|BASIC|Strategy for assigning shuffle server, only BASIC support|
+|rss.coordinator.assignment.strategy|PARTITION_BALANCE|Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance|
 |rss.coordinator.app.expired|60000|Application expired time (ms), the heartbeat interval should be less than it|
 |rss.coordinator.shuffle.nodes.max|9|The max number of shuffle server when do the assignment|
+|rss.coordinator.dynamicClientConf.path|-|The path of configuration file which have default conf for rss client|
 |rss.coordinator.exclude.nodes.file.path|-|The path of configuration file which have exclude nodes|
 |rss.coordinator.exclude.nodes.check.interval.ms|60000|Update interval (ms) for exclude nodes|
 |rss.rpc.server.port|-|RPC port for coordinator|


[incubator-uniffle] 17/17: [Improvement] Modify configuration template (#209)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 166f3f8c7c5f14eb75daca843f992e908bd3c938
Author: roryqi <je...@gmail.com>
AuthorDate: Fri Jul 1 11:49:34 2022 +0800

    [Improvement] Modify configuration template (#209)
    
    ### What changes were proposed in this pull request?
    I modify the file `conf/server.conf` and `conf/coordinator.conf`. Some configurations are not recommended. I modify them
    
    ### Why are the changes needed?
    Give users a better configuration template
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need.
---
 conf/coordinator.conf |  2 +-
 conf/server.conf      | 16 ++++------------
 2 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/conf/coordinator.conf b/conf/coordinator.conf
index 294f14e..c66e302 100644
--- a/conf/coordinator.conf
+++ b/conf/coordinator.conf
@@ -21,4 +21,4 @@ rss.jetty.http.port 19998
 rss.coordinator.server.heartbeat.timeout 30000
 rss.coordinator.app.expired 60000
 rss.coordinator.shuffle.nodes.max 13
-rss.coordinator.exclude.nodes.file.path /xxx
+rss.coordinator.exclude.nodes.file.path file:///xxx
diff --git a/conf/server.conf b/conf/server.conf
index 3c347e1..6ab6571 100644
--- a/conf/server.conf
+++ b/conf/server.conf
@@ -19,18 +19,10 @@
 rss.rpc.server.port 19999
 rss.jetty.http.port 19998
 rss.storage.basePath /xxx,/xxx
-rss.storage.type LOCALFILE_AND_HDFS
+rss.storage.type MEMORY_LOCALFILE_HDFS
 rss.coordinator.quorum xxx:19999,xxx:19999
 rss.server.buffer.capacity 40gb
-rss.server.buffer.spill.threshold 22gb
-rss.server.partition.buffer.size 150mb
 rss.server.read.buffer.capacity 20gb
-rss.server.flush.thread.alive 50
-rss.server.flush.threadPool.size 100
-
-# multistorage config
-rss.server.multistorage.enable true
-rss.server.uploader.enable true
-rss.server.uploader.base.path hdfs://xxx
-rss.server.uploader.thread.number 32
-rss.server.disk.capacity 1011550697553
+rss.server.flush.thread.alive 5
+rss.server.flush.threadPool.size 10
+rss.server.disk.capacity 1t


[incubator-uniffle] 14/17: [Improvement] Close coordinatorClients when DelegationRssShuffleManager stops (#205)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 15a6ea65ede6a2bc07824855801573a5d0cad512
Author: Zhen Wang <64...@qq.com>
AuthorDate: Thu Jun 30 11:34:40 2022 +0800

    [Improvement] Close coordinatorClients when DelegationRssShuffleManager stops (#205)
    
    ### What changes were proposed in this pull request?
    Close coordinatorClients when DelegationRssShuffleManager stops.
    
    ### Why are the changes needed?
    The coordinatorClients in DelegationRssShuffleManager are never closed.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No
---
 .../main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java  | 1 +
 .../main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java  | 1 +
 2 files changed, 2 insertions(+)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index e0a30e7..03320c0 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -173,6 +173,7 @@ public class DelegationRssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     delegate.stop();
+    coordinatorClients.forEach(CoordinatorClient::close);
   }
 
   @Override
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 4ed6cce..32d58d2 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -248,6 +248,7 @@ public class DelegationRssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     delegate.stop();
+    coordinatorClients.forEach(CoordinatorClient::close);
   }
 
   @Override


[incubator-uniffle] 13/17: [Improvement] Add RSS_IP environment variable support for K8S (#204)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 6937631876052425b8d808d26caf78c79b24536a
Author: roryqi <je...@gmail.com>
AuthorDate: Wed Jun 29 10:06:31 2022 +0800

    [Improvement] Add RSS_IP environment variable support for K8S (#204)
    
    ### What changes were proposed in this pull request?
    Method `getHostIp` can acquire IP by environment variable.
    
    ### Why are the changes needed?
    For K8S, there are too many IPs, it's hard to decide which we should use. So we use the environment variable to tell RSS to use which one.
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    UT
---
 .../java/com/tencent/rss/common/util/RssUtils.java | 10 +++++++++
 .../com/tencent/rss/common/util/RssUtilsTest.java  | 26 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/common/src/main/java/com/tencent/rss/common/util/RssUtils.java b/common/src/main/java/com/tencent/rss/common/util/RssUtils.java
index 1b7200e..7ecae6b 100644
--- a/common/src/main/java/com/tencent/rss/common/util/RssUtils.java
+++ b/common/src/main/java/com/tencent/rss/common/util/RssUtils.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import com.google.common.collect.Lists;
+import com.google.common.net.InetAddresses;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +103,15 @@ public class RssUtils {
   // loop back, etc.). If the network interface in the machine is more than one, we
   // will choose the first IP.
   public static String getHostIp() throws Exception {
+    // For K8S, there are too many IPs, it's hard to decide which we should use.
+    // So we use the environment variable to tell RSS to use which one.
+    String ip = System.getenv("RSS_IP");
+    if (ip != null) {
+      if (!InetAddresses.isInetAddress(ip)) {
+        throw new RuntimeException("Environment RSS_IP: " + ip + " is wrong format");
+      }
+      return ip;
+    }
     Enumeration<NetworkInterface> nif = NetworkInterface.getNetworkInterfaces();
     String siteLocalAddress = null;
     while (nif.hasMoreElements()) {
diff --git a/common/src/test/java/com/tencent/rss/common/util/RssUtilsTest.java b/common/src/test/java/com/tencent/rss/common/util/RssUtilsTest.java
index 95fd55f..220cb5c 100644
--- a/common/src/test/java/com/tencent/rss/common/util/RssUtilsTest.java
+++ b/common/src/test/java/com/tencent/rss/common/util/RssUtilsTest.java
@@ -18,6 +18,7 @@
 
 package com.tencent.rss.common.util;
 
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -62,6 +63,18 @@ public class RssUtilsTest {
       if (!address.equals("127.0.0.1")) {
         assertEquals(address, realIp);
       }
+      setEnv("RSS_IP", "8.8.8.8");
+      assertEquals("8.8.8.8", RssUtils.getHostIp());
+      setEnv("RSS_IP", "xxxx");
+      boolean isException = false;
+      try {
+        RssUtils.getHostIp();
+      } catch (Exception e) {
+        isException = true;
+      }
+      setEnv("RSS_IP", realIp);
+      RssUtils.getHostIp();
+      assertTrue(isException);
     } catch (Exception e) {
       fail(e.getMessage());
     }
@@ -185,6 +198,19 @@ public class RssUtilsTest {
     }
   }
 
+  public static void setEnv(String key, String value) {
+    try {
+      Map<String, String> env = System.getenv();
+      Class<?> cl = env.getClass();
+      Field field = cl.getDeclaredField("m");
+      field.setAccessible(true);
+      Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+      writableEnv.put(key, value);
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to set environment variable", e);
+    }
+  }
+
   public static class RssUtilTestDummySuccess implements RssUtilTestDummy {
     private final String s;
 


[incubator-uniffle] 02/17: [Bugfix] Fix spark2 executor stop NPE problem (#187)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 7fa8b52e5739a0c2ded7f2eca84b086713765418
Author: roryqi <je...@gmail.com>
AuthorDate: Wed Jun 22 14:30:15 2022 +0800

    [Bugfix] Fix spark2 executor stop NPE problem (#187)
    
    backport 0.5.0
    
    ### What changes were proposed in this pull request?
    We need to judge heartbeatExecutorService whether is null when we will stop it.
    
    ### Why are the changes needed?
    #177 pr introduce this problem, when we run Spark applications on our cluster, the executor will throw NPE when method `stop` is called.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
---
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5d11c39..8a2c385 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -373,7 +373,9 @@ public class RssShuffleManager implements ShuffleManager {
 
   @Override
   public void stop() {
-    heartBeatScheduledExecutorService.shutdownNow();
+    if (heartBeatScheduledExecutorService != null) {
+      heartBeatScheduledExecutorService.shutdownNow();
+    }
     threadPoolExecutor.shutdownNow();
     shuffleWriteClient.close();
   }


[incubator-uniffle] 01/17: [Improvement] Avoid using the default forkjoin pool by parallelStream directly (#180)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 46b62b2406a547dca6f6b933ee187047e3618202
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Tue Jun 21 14:15:59 2022 +0800

    [Improvement] Avoid using the default forkjoin pool by parallelStream directly (#180)
    
    ### What changes were proposed in this pull request?
    As we know that parallelStream will use the default forkjoin pool in entire jvm. To avoid it, use the custom pool and allow to specify the pool size.
    
    ### Why are the changes needed?
    use separate forkjoin pool to send shuffle data
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, introduce the configuration to control the size of forkjoinpool.
    mapreduce.rss.client.data.transfer.pool.size for MapReduce
    spark.rss.client.data.transfer.pool.size for Spark
    
    ### How was this patch tested?
    GA passed.
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   |  4 ++++
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |  5 ++++-
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  4 ++++
 .../apache/spark/shuffle/RssShuffleManager.java    |  5 ++++-
 .../apache/spark/shuffle/RssShuffleManager.java    | 14 ++++++++++---
 .../rss/client/factory/ShuffleClientFactory.java   |  4 ++--
 .../rss/client/impl/ShuffleWriteClientImpl.java    | 24 ++++++++++++++--------
 .../tencent/rss/client/util/RssClientConfig.java   |  2 ++
 .../client/impl/ShuffleWriteClientImplTest.java    |  2 +-
 .../test/java/com/tencent/rss/test/QuorumTest.java |  2 +-
 .../tencent/rss/test/ShuffleServerGrpcTest.java    |  2 +-
 .../tencent/rss/test/ShuffleWithRssClientTest.java |  2 +-
 12 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index a191e2f..3447f09 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -52,6 +52,10 @@ public class RssMRConfig {
       RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+          MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+  public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+          RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
   public static final String RSS_CLIENT_SEND_THREAD_NUM =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM;
   public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM =
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 1d8b4d6..16613e1 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -90,10 +90,13 @@ public class RssMRUtils {
         RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
     boolean replicaSkipEnabled = jobConf.getBoolean(RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
         RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
+    int dataTransferPoolSize = jobConf.getInt(RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+        RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
     ShuffleWriteClient client = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax,
-            heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled);
+            heartBeatThreadNum, replica, replicaWrite, replicaRead, replicaSkipEnabled,
+                dataTransferPoolSize);
     return client;
   }
 
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 9720ff0..8d5dda9 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -106,6 +106,10 @@ public class RssSparkConfig {
   public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE = RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED =
       SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE =
+      SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE;
+  public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE =
+      RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE;
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE =
       RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE;
   public static final String RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f1f2a36..5d11c39 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -82,6 +82,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaWrite;
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
+  private final int dataTransferPoolSize;
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
   private RemoteStorageInfo remoteStorage;
@@ -144,6 +145,8 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
     this.dataReplicaRead =  sparkConf.getInt(RssSparkConfig.RSS_DATA_REPLICA_READ,
         RssSparkConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
     this.dataReplicaSkipEnabled = sparkConf.getBoolean(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED,
         RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
     LOG.info("Check quorum config ["
@@ -167,7 +170,7 @@ public class RssShuffleManager implements ShuffleManager {
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 88a7bf8..1cfacd2 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -82,6 +82,7 @@ public class RssShuffleManager implements ShuffleManager {
   private final int dataReplicaWrite;
   private final int dataReplicaRead;
   private final boolean dataReplicaSkipEnabled;
+  private final int dataTransferPoolSize;
   private ShuffleWriteClient shuffleWriteClient;
   private final Map<String, Set<Long>> taskToSuccessBlockIds;
   private final Map<String, Set<Long>> taskToFailedBlockIds;
@@ -155,7 +156,7 @@ public class RssShuffleManager implements ShuffleManager {
     this.heartbeatInterval = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_INTERVAL,
         RssSparkConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
     this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2);
-    int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
+    final int retryMax = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_RETRY_MAX,
         RssSparkConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE,
         RssSparkConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
@@ -167,10 +168,14 @@ public class RssShuffleManager implements ShuffleManager {
         RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
         RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+
     shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     registerCoordinator();
     // fetch client conf and apply them if necessary and disable ESS
     if (isDriver && dynamicConfEnabled) {
@@ -233,10 +238,13 @@ public class RssShuffleManager implements ShuffleManager {
       RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
     int heartBeatThreadNum = sparkConf.getInt(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM,
       RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE);
+    this.dataTransferPoolSize = sparkConf.getInt(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE,
+            RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE);
+
      shuffleWriteClient = ShuffleClientFactory
         .getInstance()
         .createShuffleWriteClient(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled);
+          dataReplica, dataReplicaWrite, dataReplicaRead, dataReplicaSkipEnabled, dataTransferPoolSize);
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.taskToFailedBlockIds = taskToFailedBlockIds;
     if (loop != null) {
diff --git a/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java b/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
index aefb7b6..bc4b0cf 100644
--- a/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/com/tencent/rss/client/factory/ShuffleClientFactory.java
@@ -37,9 +37,9 @@ public class ShuffleClientFactory {
 
   public ShuffleWriteClient createShuffleWriteClient(
       String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled) {
+      int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize) {
     return new ShuffleWriteClientImpl(clientType, retryMax, retryIntervalMax, heartBeatThreadNum,
-      replica, replicaWrite, replicaRead, replicaSkipEnabled);
+      replica, replicaWrite, replicaRead, replicaSkipEnabled, dataTransferPoolSize);
   }
 
   public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request) {
diff --git a/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
index d8bfe99..34571f2 100644
--- a/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/com/tencent/rss/client/impl/ShuffleWriteClientImpl.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,19 +88,24 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int replicaWrite;
   private int replicaRead;
   private boolean replicaSkipEnabled;
+  private int dataTranferPoolSize;
+  private final ForkJoinPool dataTransferPool;
 
   public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
-                                int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled) {
+                                int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled,
+                                int dataTranferPoolSize) {
     this.clientType = clientType;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
-    coordinatorClientFactory = new CoordinatorClientFactory(clientType);
-    heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
+    this.coordinatorClientFactory = new CoordinatorClientFactory(clientType);
+    this.heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build());
     this.replica = replica;
     this.replicaWrite = replicaWrite;
     this.replicaRead = replicaRead;
     this.replicaSkipEnabled = replicaSkipEnabled;
+    this.dataTranferPoolSize = dataTranferPoolSize;
+    this.dataTransferPool = new ForkJoinPool(dataTranferPoolSize);
   }
 
   private boolean sendShuffleDataAsync(
@@ -110,13 +116,13 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     // If one or more servers is failed, the sending is not totally successful.
     AtomicBoolean isAllServersSuccess = new AtomicBoolean(true);
     if (serverToBlocks != null) {
-      serverToBlocks.entrySet().parallelStream().forEach(entry -> {
+      dataTransferPool.submit(() -> serverToBlocks.entrySet().parallelStream().forEach(entry -> {
         ShuffleServerInfo ssi = entry.getKey();
         try {
           Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = entry.getValue();
           // todo: compact unnecessary blocks that reach replicaWrite
           RssSendShuffleDataRequest request = new RssSendShuffleDataRequest(
-            appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
+                  appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
           long s = System.currentTimeMillis();
           RssSendShuffleDataResponse response = getShuffleServerClient(ssi).sendShuffleData(request);
           LOG.info("ShuffleWriteClientImpl sendShuffleData cost:" + (System.currentTimeMillis() - s));
@@ -125,17 +131,17 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
             // mark a replica of block that has been sent
             serverToBlockIds.get(ssi).forEach(block -> blockIdsTracker.get(block).incrementAndGet());
             LOG.info("Send: " + serverToBlockIds.get(ssi).size()
-              + " blocks to [" + ssi.getId() + "] successfully");
+                    + " blocks to [" + ssi.getId() + "] successfully");
           } else {
             isAllServersSuccess.set(false);
             LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId()
-              + "] failed with statusCode[" + response.getStatusCode() + "], ");
+                    + "] failed with statusCode[" + response.getStatusCode() + "], ");
           }
         } catch (Exception e) {
           isAllServersSuccess.set(false);
           LOG.warn("Send: " + serverToBlockIds.get(ssi).size() + " blocks to [" + ssi.getId() + "] failed.", e);
         }
-      });
+      })).join();
     }
     return isAllServersSuccess.get();
   }
diff --git a/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java b/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
index e1c6df7..c2ca8fb 100644
--- a/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
+++ b/client/src/main/java/com/tencent/rss/client/util/RssClientConfig.java
@@ -35,6 +35,8 @@ public class RssClientConfig {
   public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE = 1;
   public static final String RSS_DATA_REPLICA_SKIP_ENABLED = "rss.data.replica.skip.enabled";
   public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = true;
+  public static final String RSS_DATA_TRANSFER_POOL_SIZE = "rss.client.data.transfer.pool.size";
+  public static final int RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE = Runtime.getRuntime().availableProcessors();
   public static final String RSS_HEARTBEAT_INTERVAL = "rss.heartbeat.interval";
   public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = 10 * 1000L;
   public static final String RSS_HEARTBEAT_TIMEOUT = "rss.heartbeat.timeout";
diff --git a/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
index 16e889c..e3b4443 100644
--- a/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/com/tencent/rss/client/impl/ShuffleWriteClientImplTest.java
@@ -42,7 +42,7 @@ public class ShuffleWriteClientImplTest {
   @Test
   public void testSendData() {
     ShuffleWriteClientImpl shuffleWriteClient =
-        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true);
+        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1);
     ShuffleServerClient mockShuffleServerClient = mock(ShuffleServerClient.class);
     ShuffleWriteClientImpl spyClient = spy(shuffleWriteClient);
     doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
index 01614ae..0eeb2f0 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/QuorumTest.java
@@ -259,7 +259,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
       int replica, int replicaWrite, int replicaRead, boolean replicaSkip) {
 
     shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      replica, replicaWrite, replicaRead, replicaSkip);
+      replica, replicaWrite, replicaRead, replicaSkip, 1);
 
     List<ShuffleServerInfo> allServers = Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1,
         shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4);
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
index b4ea72a..3b30828 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleServerGrpcTest.java
@@ -102,7 +102,7 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
   public void clearResourceTest() throws Exception {
     final ShuffleWriteClient shuffleWriteClient =
         ShuffleClientFactory.getInstance().createShuffleWriteClient(
-            "GRPC", 2, 10000L, 4, 1, 1, 1, true);
+            "GRPC", 2, 10000L, 4, 1, 1, 1, true, 1);
     shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
     shuffleWriteClient.registerShuffle(
         new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
index 2a6f2d9..a23d3a4 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/ShuffleWithRssClientTest.java
@@ -89,7 +89,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
   @BeforeEach
   public void createClient() {
     shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
-      1, 1, 1, true);
+      1, 1, 1, true, 1);
   }
 
   @AfterEach


[incubator-uniffle] 08/17: [Improvement] Check ADAPTIVE_EXECUTION_ENABLED in RssShuffleManager (#197)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 6bdf49e1a68131545a8385123da558be287a196f
Author: xunxunmimi5577 <52...@users.noreply.github.com>
AuthorDate: Fri Jun 24 02:12:40 2022 +0800

    [Improvement] Check ADAPTIVE_EXECUTION_ENABLED in RssShuffleManager (#197)
    
    ### What changes were proposed in this pull request?
     1. Add checking of spark.sql.adaptive.enabled=false in RssShuffleManager's constructor for spark2.
     2. Add a description of this parameter in the Deploy Spark Client section of the readme.
    
    ### Why are the changes needed?
     When use firestorm+spark2+spark.sql.adaptive.enabled=true,the result is wrong,but we didn't get any hints.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
---
 README.md                                                              | 1 +
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java      | 3 +++
 2 files changed, 4 insertions(+)

diff --git a/README.md b/README.md
index 50903ce..0fb65e5 100644
--- a/README.md
+++ b/README.md
@@ -149,6 +149,7 @@ rss-xxx.tgz will be generated for deployment
    ```
    spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
    spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
+   # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
    ```
 
 ### Support Spark dynamic allocation
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 8a2c385..28f1a8d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -136,6 +136,9 @@ public class RssShuffleManager implements ShuffleManager {
   };
 
   public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
+    if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
+      throw new IllegalArgumentException("Spark2 doesn't support AQE, spark.sql.adaptive.enabled should be false.");
+    }
     this.sparkConf = sparkConf;
 
     // set & check replica config


[incubator-uniffle] 10/17: [MINOR] Close clusterManager resources (#202)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 8b5f363fa296312042130b73c8dd8f5a15b5e0ae
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Mon Jun 27 17:34:13 2022 +0800

    [MINOR] Close clusterManager resources (#202)
    
    ### What changes were proposed in this pull request?
    1. Change the method of shutdown to close
    2. Close resources of clustermanager in test cases
    
    ### Why are the changes needed?
    Close resources to reduce the resource occupying in test cases.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Test cases
---
 .../java/com/tencent/rss/coordinator/ClusterManager.java    |  5 ++---
 .../java/com/tencent/rss/coordinator/CoordinatorServer.java |  2 +-
 .../com/tencent/rss/coordinator/SimpleClusterManager.java   | 10 ++++++++--
 .../rss/coordinator/BasicAssignmentStrategyTest.java        |  5 ++++-
 .../coordinator/PartitionBalanceAssignmentStrategyTest.java |  4 +++-
 .../tencent/rss/coordinator/SimpleClusterManagerTest.java   | 13 +++++++++++--
 .../test/java/com/tencent/rss/test/CoordinatorGrpcTest.java |  1 +
 7 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManager.java
index 4249a03..9f5915e 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManager.java
@@ -18,10 +18,11 @@
 
 package com.tencent.rss.coordinator;
 
+import java.io.Closeable;
 import java.util.List;
 import java.util.Set;
 
-public interface ClusterManager {
+public interface ClusterManager extends Closeable {
 
   /**
    * Add a server to the cluster.
@@ -49,6 +50,4 @@ public interface ClusterManager {
   List<ServerNode> list();
 
   int getShuffleNodesMax();
-
-  void shutdown();
 }
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
index 7ba7e1c..3b79221 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
@@ -94,7 +94,7 @@ public class CoordinatorServer {
       jettyServer.stop();
     }
     if (clusterManager != null) {
-      clusterManager.shutdown();
+      clusterManager.close();
     }
     if (accessManager != null) {
       accessManager.close();
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index 10af74d..fcfd1dc 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -21,6 +21,7 @@ package com.tencent.rss.coordinator;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -186,8 +187,13 @@ public class SimpleClusterManager implements ClusterManager {
   }
 
   @Override
-  public void shutdown() {
-    scheduledExecutorService.shutdown();
+  public void close() throws IOException {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdown();
+    }
+    if (checkNodesExecutorService != null) {
+      checkNodesExecutorService.shutdown();
+    }
   }
 
   @Override
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
index 97afabf..7a95d76 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
@@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.Sets;
 import com.tencent.rss.common.PartitionRange;
+
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -49,8 +51,9 @@ public class BasicAssignmentStrategyTest {
   }
 
   @AfterEach
-  public void tearDown() {
+  public void tearDown() throws IOException {
     clusterManager.clear();
+    clusterManager.close();
   }
 
   @Test
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
index 018aa62..9ca4146 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
@@ -18,6 +18,7 @@
 
 package com.tencent.rss.coordinator;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
@@ -169,8 +170,9 @@ public class PartitionBalanceAssignmentStrategyTest {
   }
 
   @AfterEach
-  public void tearDown() {
+  public void tearDown() throws IOException {
     clusterManager.clear();
+    clusterManager.close();
   }
 
   void updateServerResource(List<Long> resources) {
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index bed9081..fc90d9e 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -20,6 +20,7 @@ package com.tencent.rss.coordinator;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +50,7 @@ public class SimpleClusterManagerTest {
   }
 
   @Test
-  public void getServerListTest() {
+  public void getServerListTest() throws IOException {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
@@ -99,6 +100,8 @@ public class SimpleClusterManagerTest {
     assertTrue(testTagNodes.contains(sn2));
     assertTrue(testTagNodes.contains(sn3));
     assertTrue(testTagNodes.contains(sn4));
+
+    clusterManager.close();
   }
 
   @Test
@@ -141,10 +144,12 @@ public class SimpleClusterManagerTest {
     Thread.sleep(500);
     serverNodes = clusterManager.getServerList(testTags);
     assertEquals(0, serverNodes.size());
+
+    clusterManager.close();
   }
 
   @Test
-  public void testGetCorrectServerNodesWhenOneNodeRemoved() {
+  public void testGetCorrectServerNodesWhenOneNodeRemoved() throws IOException {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
@@ -167,6 +172,8 @@ public class SimpleClusterManagerTest {
     List<ServerNode> serverList = clusterManager.getServerList(testTags);
     Assertions.assertEquals(2, tagToNodes.get(testTags.iterator().next()).size());
     Assertions.assertEquals(2, serverList.size());
+
+    clusterManager.close();
   }
 
   @Test
@@ -231,6 +238,8 @@ public class SimpleClusterManagerTest {
       remainNodes.remove(node.getId());
     }
     assertEquals(0, remainNodes.size());
+
+    scm.close();
   }
 
   private void writeExcludeHosts(String path, Set<String> values) throws Exception {
diff --git a/integration-test/common/src/test/java/com/tencent/rss/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/com/tencent/rss/test/CoordinatorGrpcTest.java
index cc11218..b713c00 100644
--- a/integration-test/common/src/test/java/com/tencent/rss/test/CoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/com/tencent/rss/test/CoordinatorGrpcTest.java
@@ -237,6 +237,7 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
     shuffleServers.set(0, ss);
     Thread.sleep(3000);
     assertEquals(2, coordinators.get(0).getClusterManager().getNodesNum());
+    scm.close();
   }
 
   @Test


[incubator-uniffle] 06/17: [Bugfix] Fix MR don't have remote storage information when we use dynamic conf and MEMORY_LOCALE_HDFS storageType (#195)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit cf731f24ef3f10bb24c57475131c04355c9d7e64
Author: roryqi <je...@gmail.com>
AuthorDate: Thu Jun 23 09:49:16 2022 +0800

    [Bugfix] Fix MR don't have remote storage information when we use dynamic conf and MEMORY_LOCALE_HDFS storageType (#195)
    
    ### What changes were proposed in this pull request?
    We should aquire the storageType from extraConf.
    ### Why are the changes needed?
    If we don't have this patch, MR don't work when we use dynamic conf and MEMORY_LOCALE_HDFS storageType.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
---
 .../main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 7511104..976b03c 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -180,7 +180,7 @@ public class RssMRAppMaster extends MRAppMaster {
         RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf);
       }
 
-      String storageType = conf.get(RssMRConfig.RSS_STORAGE_TYPE);
+      String storageType = RssMRUtils.getString(extraConf, conf, RssMRConfig.RSS_STORAGE_TYPE);
       RemoteStorageInfo defaultRemoteStorage =
           new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH, ""));
       RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(


[incubator-uniffle] 12/17: [Improvement] Move detailed client configuration to individual doc (#201)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 2c1c554bb9a47a25e56164d1af2efa1acff66cd8
Author: frankliee <fr...@tencent.com>
AuthorDate: Tue Jun 28 11:02:00 2022 +0800

    [Improvement] Move detailed client configuration to individual doc (#201)
    
     ### What changes were proposed in this pull request?
    1.  Put detailed configuration to doc subdirectory.
    2. Add doc for client quorum setting.
    
    ### Why are the changes needed?
    Update doc
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Just doc.
---
 README.md                 |  22 +------
 docs/client_guide.md      | 148 ++++++++++++++++++++++++++++++++++++++++++++++
 docs/coordinator_guide.md |   8 +++
 docs/index.md             |   8 +++
 docs/pageA.md             |   7 ---
 docs/server_guide.md      |   7 +++
 6 files changed, 173 insertions(+), 27 deletions(-)

diff --git a/README.md b/README.md
index 51a1ed0..eba4fd3 100644
--- a/README.md
+++ b/README.md
@@ -233,27 +233,9 @@ The important configuration is listed as following.
 |rss.server.flush.cold.storage.threshold.size|64M| The threshold of data size for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used|
 
 
-### Spark Client
+### Shuffle Client
 
-|Property Name|Default|Description|
-|---|---|---|
-|spark.rss.writer.buffer.size|3m|Buffer size for single partition data|
-|spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
-|spark.rss.coordinator.quorum|-|Coordinator quorum|
-|spark.rss.storage.type|-|Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS|
-|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
-|spark.rss.client.read.buffer.size|32m|The max data size read from storage|
-|spark.rss.client.send.threadPool.size|10|The thread size for send shuffle data to shuffle server|
-
-
-### MapReduce Client
-
-|Property Name|Default|Description|
-|---|---|---|
-|mapreduce.rss.coordinator.quorum|-|Coordinator quorum|
-|mapreduce.rss.storage.type|-|Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS|
-|mapreduce.rss.client.max.buffer.size|3k|The max buffer size in map side|
-|mapreduce.rss.client.read.buffer.size|32m|The max data size read from storage|
+For more details of advanced configuration, please see [Firestorm Shuffle Client Guide](https://github.com/Tencent/Firestorm/blob/master/docs/client_guide.md).
 
 ## LICENSE
 
diff --git a/docs/client_guide.md b/docs/client_guide.md
new file mode 100644
index 0000000..95b960b
--- /dev/null
+++ b/docs/client_guide.md
@@ -0,0 +1,148 @@
+---
+layout: page
+displayTitle: Firestorm Shuffle Client Guide
+title: Firestorm Shuffle Client Guide
+description: Firestorm Shuffle Client Guide
+---
+# Firestorm Shuffle Client Guide
+
+Firestorm is designed as a unified shuffle engine for multiple computing frameworks, including Apache Spark and Apache Hadoop.
+Firestorm has provided pluggable client plugins to enable remote shuffle in Spark and MapReduce.
+
+## Deploy
+This document will introduce how to deploy Firestorm client plugins with Spark and MapReduce.
+
+### Deploy Spark Client Plugin
+
+1. Add client jar to Spark classpath, eg, SPARK_HOME/jars/
+
+   The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar
+
+   The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar
+
+2. Update Spark conf to enable Firestorm, eg,
+
+   ```
+   spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
+   spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
+   # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
+   ```
+
+### Support Spark Dynamic Allocation
+
+To support spark dynamic allocation with Firestorm, spark code should be updated.
+There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.
+
+After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
+  ```
+  spark.shuffle.service.enabled false
+  spark.dynamicAllocation.enabled true
+  ```
+
+### Deploy MapReduce Client Plugin
+
+1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/
+
+The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
+
+2. Update MapReduce conf to enable Firestorm, eg,
+
+   ```
+   -Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
+   -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
+   -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
+   -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
+   ```
+Note that the RssMRAppMaster will automatically disable slow start (i.e., `mapreduce.job.reduce.slowstart.completedmaps=1`)
+and job recovery (i.e., `yarn.app.mapreduce.am.job.recovery.enable=false`)
+
+
+## Configuration
+
+The important configuration of client is listed as following.
+
+### Common Setting
+These configurations are shared by all types of clients.
+
+|Property Name|Default|Description|
+|---|---|---|
+|<client_type>.rss.coordinator.quorum|-|Coordinator quorum|
+|<client_type>.rss.writer.buffer.size|3m|Buffer size for single partition data|
+|<client_type>.rss.storage.type|-|Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS|
+|<client_type>.rss.client.read.buffer.size|14m|The max data size read from storage|
+|<client_type>.rss.client.send.threadPool.size|5|The thread size for send shuffle data to shuffle server|
+
+Notice:
+
+1. `<client_type>` should be `spark` or `mapreduce`
+
+2. `<client_type>.rss.coordinator.quorum` is compulsory, and other configurations are optional when coordinator dynamic configuration is enabled.
+
+### Adaptive Remote Shuffle Enabling 
+
+To select build-in shuffle or remote shuffle in a smart manner, Firestorm support adaptive enabling. 
+The client should use `DelegationRssShuffleManager` and provide its unique <access_id> so that the coordinator could distinguish whether it should enable remote shuffle.
+
+```
+spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager
+spark.rss.access.id=<access_id> 
+```
+
+Notice:
+Currently, this feature only supports Spark. 
+
+Other configuration:
+
+|Property Name|Default|Description|
+|---|---|---|
+|spark.rss.access.timeout.ms|10000|The timeout to access Firestorm coordinator|
+  
+
+### Client Quorum Setting 
+
+Firestorm supports client-side quorum protocol to tolerant shuffle server crash. 
+This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server.
+Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature.
+
+|Property Name|Default|Description|
+|---|---|---|
+|<client_type>.rss.data.replica|1|The max server number that each block can be send by client in quorum protocol|
+|<client_type>.rss.data.replica.write|1|The min server number that each block should be send by client successfully|
+|<client_type>.rss.data.replica.read|1|The min server number that metadata should be fetched by client successfully |
+
+Notice: 
+
+1. `spark.rss.data.replica.write` + `spark.rss.data.replica.read` > `spark.rss.data.replica`
+
+Recommended examples:
+
+1. Performance First (default)
+```
+spark.rss.data.replica 1
+spark.rss.data.replica.write 1
+spark.rss.data.replica.read 1
+```
+
+2. Fault-tolerant First
+```
+spark.rss.data.replica 3
+spark.rss.data.replica.write 2
+spark.rss.data.replica.read 2
+```
+
+### Spark Specialized Setting
+
+The important configuration is listed as following.
+
+|Property Name|Default|Description|
+|---|---|---|
+|spark.rss.writer.buffer.spill.size|128m|Buffer size for total partition data|
+|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
+
+
+### MapReduce Specialized Setting
+
+|Property Name|Default|Description|
+|---|---|---|
+|mapreduce.rss.client.max.buffer.size|3k|The max buffer size in map side|
+|mapreduce.rss.client.batch.trigger.num|50|The max batch of buffers to send data in map side|
\ No newline at end of file
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
new file mode 100644
index 0000000..049ec18
--- /dev/null
+++ b/docs/coordinator_guide.md
@@ -0,0 +1,8 @@
+---
+layout: page
+displayTitle: Firestorm Coordinator Guide
+title: Firestorm Coordinator Guide
+description: Firestorm Coordinator Guide
+---
+
+# Firestorm Coordinator Guide
\ No newline at end of file
diff --git a/docs/index.md b/docs/index.md
index 3af2657..5d2599f 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -12,6 +12,14 @@ to store shuffle data on remote servers.
 ![Rss Architecture](asset/rss_architecture.png)
 
 
+More advanced details for Firestorm users are available in the following:
+
+- [Firestorm Coordinator Guide](coordinator_guide.html)
+
+- [Firestorm Shuffle Server Guide](server_guide.html)
+
+- [Firestorm Shuffle Client Guide](client_guide.html)
+
 
 Here you can read API docs for Firestorm along with its submodules.
 
diff --git a/docs/pageA.md b/docs/pageA.md
deleted file mode 100644
index 50ef92e..0000000
--- a/docs/pageA.md
+++ /dev/null
@@ -1,7 +0,0 @@
----
-layout: page
-displayTitle: A
-title: A
-description: Firestorm documentation homepage
----
- 
\ No newline at end of file
diff --git a/docs/server_guide.md b/docs/server_guide.md
new file mode 100644
index 0000000..3356f61
--- /dev/null
+++ b/docs/server_guide.md
@@ -0,0 +1,7 @@
+---
+layout: page
+displayTitle: Firestorm Shuffle Server Guide
+title: Firestorm Shuffle Server Guide
+description: Firestorm Shuffle Server Guide
+---
+# Firestorm Shuffle Server Guide
\ No newline at end of file


[incubator-uniffle] 09/17: [Improvement] Add dynamic allocation patch for Spark 3.2 (#199)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit a253b1fed2e947e397b45b1db8f56d856eabc9fc
Author: roryqi <je...@gmail.com>
AuthorDate: Mon Jun 27 10:07:13 2022 +0800

    [Improvement] Add dynamic allocation patch for Spark 3.2 (#199)
    
    ### What changes were proposed in this pull request?
    Add the dynamic allocation patch for Spark 3.2, solve issue #106
    
    ### Why are the changes needed?
    If we don't have this patch, users can't use dynamic allocation in Spark 3.2.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
---
 README.md                                          |  2 +-
 .../spark-3.2.1_dynamic_allocation_support.patch   | 92 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 0fb65e5..9ad8299 100644
--- a/README.md
+++ b/README.md
@@ -155,7 +155,7 @@ rss-xxx.tgz will be generated for deployment
 ### Support Spark dynamic allocation
 
 To support spark dynamic allocation with Firestorm, spark code should be updated.
-There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.
+There are 3 patches for spark (2.4.6/3.1.2/3.2.1) in spark-patches folder for reference.
 
 After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
   ```
diff --git a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
new file mode 100644
index 0000000..1e195df
--- /dev/null
+++ b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
@@ -0,0 +1,92 @@
+diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
+index 1b4e7ba5106..95818ff72ca 100644
+--- a/core/src/main/scala/org/apache/spark/Dependency.scala
++++ b/core/src/main/scala/org/apache/spark/Dependency.scala
+@@ -174,8 +174,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
+       !rdd.isBarrier()
+   }
+ 
+-  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+-  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
++  if (!_rdd.context.getConf.isRssEnable()) {
++    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
++    _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
++  }
+ }
+ 
+ 
+diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+index c4b619300b5..821a01985d9 100644
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+@@ -207,7 +207,9 @@ private[spark] class ExecutorAllocationManager(
+       // If dynamic allocation shuffle tracking or worker decommissioning along with
+       // storage shuffle decommissioning is enabled we have *experimental* support for
+       // decommissioning without a shuffle service.
+-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
++      if (conf.isRssEnable()) {
++        logInfo("Dynamic allocation will use remote shuffle service")
++      } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+           (decommissionEnabled &&
+             conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
+         logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
+diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
+index 5f37a1abb19..af4bee1e1bb 100644
+--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
++++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
+@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
+     Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
+   }
+ 
++  /**
++   * Return true if remote shuffle service is enabled.
++   */
++  def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
+ }
+ 
+ private[spark] object SparkConf extends Logging {
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+index a82d261d545..72e54940ca2 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+@@ -2231,7 +2231,8 @@ private[spark] class DAGScheduler(
+     // if the cluster manager explicitly tells us that the entire worker was lost, then
+     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the Worker.)
+-    val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
++    val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
++      !sc.getConf.isRssEnable()
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+index 3b72103f993..8e03754941e 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+@@ -1015,7 +1015,8 @@ private[spark] class TaskSetManager(
+     // and we are not using an external shuffle server which could serve the shuffle outputs.
+     // The reason is the next stage wouldn't be able to fetch the data from this dead executor
+     // so we would need to rerun these tasks on other executors.
+-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
++    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled &&
++        !isZombie && !conf.isRssEnable()) {
+       for ((tid, info) <- taskInfos if info.executorId == execId) {
+         val index = taskInfos(tid).index
+         // We may have a running task whose partition has been marked as successful,
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+index 47d61196fe8..98a5381bef4 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+@@ -174,6 +174,9 @@ class ShuffledRowRDD(
+   }
+ 
+   override def getPreferredLocations(partition: Partition): Seq[String] = {
++    if (conf.isRssEnable()) {
++      return Nil
++    }
+     val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+     partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
+       case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
+-- 
+2.32.0
+


[incubator-uniffle] 05/17: upgrade to 0.6.0-snapshot (#190)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 8d8e6bf81ebf0bbb669642a46d13581927f9cec9
Author: roryqi <je...@gmail.com>
AuthorDate: Wed Jun 22 17:36:33 2022 +0800

    upgrade to 0.6.0-snapshot (#190)
    
    ### What changes were proposed in this pull request?
    upgrade version number
    
    ### Why are the changes needed?
    upgrade to 0.6.0-snapshot
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    no
---
 client-mr/pom.xml                     | 4 ++--
 client-spark/common/pom.xml           | 4 ++--
 client-spark/spark2/pom.xml           | 4 ++--
 client-spark/spark3/pom.xml           | 4 ++--
 client/pom.xml                        | 4 ++--
 common/pom.xml                        | 2 +-
 coordinator/pom.xml                   | 2 +-
 integration-test/common/pom.xml       | 4 ++--
 integration-test/mr/pom.xml           | 4 ++--
 integration-test/spark-common/pom.xml | 4 ++--
 integration-test/spark2/pom.xml       | 4 ++--
 integration-test/spark3/pom.xml       | 4 ++--
 internal-client/pom.xml               | 4 ++--
 pom.xml                               | 2 +-
 proto/pom.xml                         | 2 +-
 server/pom.xml                        | 2 +-
 storage/pom.xml                       | 2 +-
 17 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/client-mr/pom.xml b/client-mr/pom.xml
index c15ffba..650a771 100644
--- a/client-mr/pom.xml
+++ b/client-mr/pom.xml
@@ -23,13 +23,13 @@
     <parent>
         <artifactId>rss-main</artifactId>
         <groupId>com.tencent.rss</groupId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-client-mr</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/client-spark/common/pom.xml b/client-spark/common/pom.xml
index 61c4b1f..e79a671 100644
--- a/client-spark/common/pom.xml
+++ b/client-spark/common/pom.xml
@@ -25,12 +25,12 @@
     <parent>
         <artifactId>rss-main</artifactId>
         <groupId>com.tencent.rss</groupId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
     <artifactId>rss-client-spark-common</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/client-spark/spark2/pom.xml b/client-spark/spark2/pom.xml
index 41a4432..54434d5 100644
--- a/client-spark/spark2/pom.xml
+++ b/client-spark/spark2/pom.xml
@@ -24,13 +24,13 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-client-spark2</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/client-spark/spark3/pom.xml b/client-spark/spark3/pom.xml
index 5674613..8cd091e 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/spark3/pom.xml
@@ -24,13 +24,13 @@
     <parent>
         <artifactId>rss-main</artifactId>
         <groupId>com.tencent.rss</groupId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-client-spark3</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/client/pom.xml b/client/pom.xml
index e6134ce..1b4e3d7 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -24,12 +24,12 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
   </parent>
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-client</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/common/pom.xml b/common/pom.xml
index b4b65f8..9d6b2df 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index e860a50..28b5b5c 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml
index 2a759a4..179ecb8 100644
--- a/integration-test/common/pom.xml
+++ b/integration-test/common/pom.xml
@@ -24,13 +24,13 @@
     <parent>
         <groupId>com.tencent.rss</groupId>
         <artifactId>rss-main</artifactId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-integration-common-test</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/integration-test/mr/pom.xml b/integration-test/mr/pom.xml
index 489ffd5..6ae8a17 100644
--- a/integration-test/mr/pom.xml
+++ b/integration-test/mr/pom.xml
@@ -22,14 +22,14 @@
     <parent>
         <artifactId>rss-main</artifactId>
         <groupId>com.tencent.rss</groupId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-integration-mr-test</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/integration-test/spark-common/pom.xml b/integration-test/spark-common/pom.xml
index 284ca2b..8f642a5 100644
--- a/integration-test/spark-common/pom.xml
+++ b/integration-test/spark-common/pom.xml
@@ -23,14 +23,14 @@
   <parent>
     <artifactId>rss-main</artifactId>
     <groupId>com.tencent.rss</groupId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-integration-spark-common-test</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/integration-test/spark2/pom.xml b/integration-test/spark2/pom.xml
index 9e63365..554494e 100644
--- a/integration-test/spark2/pom.xml
+++ b/integration-test/spark2/pom.xml
@@ -23,14 +23,14 @@
   <parent>
     <artifactId>rss-main</artifactId>
     <groupId>com.tencent.rss</groupId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-integration-spark2-test</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/integration-test/spark3/pom.xml b/integration-test/spark3/pom.xml
index dd968e0..60406e0 100644
--- a/integration-test/spark3/pom.xml
+++ b/integration-test/spark3/pom.xml
@@ -23,14 +23,14 @@
     <parent>
         <artifactId>rss-main</artifactId>
         <groupId>com.tencent.rss</groupId>
-        <version>0.5.0-snapshot</version>
+        <version>0.6.0-snapshot</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-integration-spark3-test</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <packaging>jar</packaging>
 
     <dependencies>
diff --git a/internal-client/pom.xml b/internal-client/pom.xml
index d1f93de..42e697f 100644
--- a/internal-client/pom.xml
+++ b/internal-client/pom.xml
@@ -24,12 +24,12 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
   </parent>
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-internal-client</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/pom.xml b/pom.xml
index 1d712ce..aef4fec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
 
   <groupId>com.tencent.rss</groupId>
   <artifactId>rss-main</artifactId>
-  <version>0.5.0-snapshot</version>
+  <version>0.6.0-snapshot</version>
   <packaging>pom</packaging>
   <name>Remote Shuffle Service Project Parent POM</name>
   <description>Remote Shuffle Service Project</description>
diff --git a/proto/pom.xml b/proto/pom.xml
index 707a964..a96d505 100644
--- a/proto/pom.xml
+++ b/proto/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <artifactId>rss-main</artifactId>
     <groupId>com.tencent.rss</groupId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
   </parent>
 
   <artifactId>rss-proto</artifactId>
diff --git a/server/pom.xml b/server/pom.xml
index c0f8675..dfd09c9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/storage/pom.xml b/storage/pom.xml
index 9f9f355..aa412a7 100644
--- a/storage/pom.xml
+++ b/storage/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>com.tencent.rss</groupId>
     <artifactId>rss-main</artifactId>
-    <version>0.5.0-snapshot</version>
+    <version>0.6.0-snapshot</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 


[incubator-uniffle] 11/17: Support build_distribution.sh to specify different mvn build options for Spark2 and Spark3 (#203)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 392c88129f2706043ebb87cc89e9e2cde5733647
Author: cxzl25 <cx...@users.noreply.github.com>
AuthorDate: Tue Jun 28 10:09:01 2022 +0800

    Support build_distribution.sh to specify different mvn build options for Spark2 and Spark3 (#203)
    
    What changes were proposed in this pull request?
    Add --spark2-mvn, --spark3-mvn parameters in build_distribution.sh to support compiling different profiles, we can pass in different maven parameters, such as profile, spark version.
    Add --help parameters in build_distribution.sh, fix typo.
    gitignore ignores the tar package generated by build.
    README added how to use build_distribution.sh.
    Why are the changes needed?
    If we use such a command to build, Spark2 will also use the Spark3 version to compile, so we'd better distinguish the build options of different versions.
    
    ./build_distribution.sh -Pspark3.2
    Does this PR introduce any user-facing change?
    No
    
    How was this patch tested?
    local test
---
 .gitignore            |  1 +
 README.md             | 16 ++++++++++++++++
 build_distribution.sh | 53 +++++++++++++++++++++++++++++++++++++++++++++++----
 pom.xml               |  4 ++--
 4 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5c39d59..b6164b2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,4 +20,5 @@ reports/
 metastore_db/
 derby.log
 dependency-reduced-pom.xml
+rss-*.tgz
 
diff --git a/README.md b/README.md
index 9ad8299..51a1ed0 100644
--- a/README.md
+++ b/README.md
@@ -50,10 +50,26 @@ To build it, run:
 
     mvn -DskipTests clean package
 
+Build against profile Spark2(2.4.6)
+
+    mvn -DskipTests clean package -Pspark2
+
+Build against profile Spark3(3.1.2)
+
+    mvn -DskipTests clean package -Pspark3
+
+Build against Spark 3.2.x
+
+    mvn -DskipTests clean package -Pspark3.2
+
 To package the Firestorm, run:
 
     ./build_distribution.sh
 
+Package against Spark 3.2.x, run:
+
+    ./build_distribution.sh --spark3-profile 'spark3.2'
+
 rss-xxx.tgz will be generated for deployment
 
 ## Deploy
diff --git a/build_distribution.sh b/build_distribution.sh
index baf50e4..214a2ed 100755
--- a/build_distribution.sh
+++ b/build_distribution.sh
@@ -32,12 +32,57 @@ RSS_HOME="$(
 
 function exit_with_usage() {
   set +x
-  echo "$0 - tool for making binary distributions of Rmote Shuffle Service"
+  echo "./build_distribution.sh - Tool for making binary distributions of Remote Shuffle Service"
   echo ""
-  echo "usage:"
+  echo "Usage:"
+  echo "+------------------------------------------------------------------------------------------------------+"
+  echo "| ./build_distribution.sh [--spark2-profile <spark2 profile id>] [--spark2-mvn <custom maven options>] |"
+  echo "|                         [--spark3-profile <spark3 profile id>] [--spark3-mvn <custom maven options>] |"
+  echo "|                         <maven build options>                                                        |"
+  echo "+------------------------------------------------------------------------------------------------------+"
   exit 1
 }
 
+SPARK2_PROFILE_ID="spark2"
+SPARK2_MVN_OPTS=""
+SPARK3_PROFILE_ID="spark3"
+SPARK3_MVN_OPTS=""
+while (( "$#" )); do
+  case $1 in
+    --spark2-profile)
+      SPARK2_PROFILE_ID="$2"
+      shift
+      ;;
+    --spark2-mvn)
+      SPARK2_MVN_OPTS=$2
+      shift
+      ;;
+    --spark3-profile)
+      SPARK3_PROFILE_ID="$2"
+      shift
+      ;;
+    --spark3-mvn)
+      SPARK3_MVN_OPTS=$2
+      shift
+      ;;
+    --help)
+      exit_with_usage
+      ;;
+    --*)
+      echo "Error: $1 is not supported"
+      exit_with_usage
+      ;;
+    -*)
+      break
+      ;;
+    *)
+      echo "Error: $1 is not supported"
+      exit_with_usage
+      ;;
+  esac
+  shift
+done
+
 cd $RSS_HOME
 
 if [ -z "$JAVA_HOME" ]; then
@@ -99,7 +144,7 @@ cp "${RSS_HOME}"/coordinator/target/jars/* ${COORDINATOR_JAR_DIR}
 CLIENT_JAR_DIR="${DISTDIR}/jars/client"
 mkdir -p $CLIENT_JAR_DIR
 
-BUILD_COMMAND_SPARK2=("$MVN" clean package -Pspark2 -pl client-spark/spark2 -DskipTests -am $@)
+BUILD_COMMAND_SPARK2=("$MVN" clean package -P$SPARK2_PROFILE_ID -pl client-spark/spark2 -DskipTests -am $@ $SPARK2_MVN_OPTS)
 
 # Actually build the jar
 echo -e "\nBuilding with..."
@@ -114,7 +159,7 @@ SPARK_CLIENT2_JAR="${RSS_HOME}/client-spark/spark2/target/shaded/rss-client-spar
 echo "copy $SPARK_CLIENT2_JAR to ${SPARK_CLIENT2_JAR_DIR}"
 cp $SPARK_CLIENT2_JAR ${SPARK_CLIENT2_JAR_DIR}
 
-BUILD_COMMAND_SPARK3=("$MVN" clean package -Pspark3 -pl client-spark/spark3 -DskipTests -am $@)
+BUILD_COMMAND_SPARK3=("$MVN" clean package -P$SPARK3_PROFILE_ID -pl client-spark/spark3 -DskipTests -am $@ $SPARK3_MVN_OPTS)
 
 echo -e "\nBuilding with..."
 echo -e "\$ ${BUILD_COMMAND_SPARK3[@]}\n"
diff --git a/pom.xml b/pom.xml
index aef4fec..2f412f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -795,12 +795,12 @@
       <plugin>
         <groupId>com.github.spotbugs</groupId>
         <artifactId>spotbugs-maven-plugin</artifactId>
-	<version>${spotbugs-maven-plugin.version}</version>
+        <version>${spotbugs-maven-plugin.version}</version>
         <dependencies>
           <dependency>
             <groupId>com.github.spotbugs</groupId>
             <artifactId>spotbugs</artifactId>
-	    <version>${spotbugs.version}</version>
+            <version>${spotbugs.version}</version>
           </dependency>
         </dependencies>
         <configuration>


[incubator-uniffle] 15/17: [Minor] Make clearResourceThread and processEventThread daemon (#207)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit ba47aa017f67e681af7c311c4ef8578eef740d4b
Author: Zhen Wang <64...@qq.com>
AuthorDate: Thu Jun 30 14:56:54 2022 +0800

    [Minor] Make clearResourceThread and processEventThread daemon (#207)
    
    ### What changes were proposed in this pull request?
    Make clearResourceThread daemon and processEventThread daemon.
    
    ### Why are the changes needed?
    `clearResourceThread` and `processEventThread` never exits, we can make it daemon.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Nod
---
 .../java/com/tencent/rss/server/ShuffleFlushManager.java     | 12 ++++++++----
 .../main/java/com/tencent/rss/server/ShuffleTaskManager.java |  1 +
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
index e246b02..be941ac 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java
@@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.RangeMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -60,7 +61,6 @@ public class ShuffleFlushManager {
   private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> handlers = Maps.newConcurrentMap();
   // appId -> shuffleId -> committed shuffle blockIds
   private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds = Maps.newConcurrentMap();
-  private Runnable processEventThread;
   private final int retryMax;
 
   private final StorageManager storageManager;
@@ -84,11 +84,12 @@ public class ShuffleFlushManager {
     BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(waitQueueSize);
     int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE);
     long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
-    threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue);
+    threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build());
     storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
     pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
     // the thread for flush data
-    processEventThread = () -> {
+    Runnable processEventRunnable = () -> {
       while (true) {
         try {
           ShuffleDataFlushEvent event = flushQueue.take();
@@ -103,7 +104,10 @@ public class ShuffleFlushManager {
         }
       }
     };
-    new Thread(processEventThread).start();
+    Thread processEventThread = new Thread(processEventRunnable);
+    processEventThread.setName("ProcessEventThread");
+    processEventThread.setDaemon(true);
+    processEventThread.start();
     // todo: extract a class named Service, and support stop method
     Thread thread = new Thread("PendingEventProcessThread") {
       @Override
diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
index e847779..fc37a19 100644
--- a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
+++ b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java
@@ -123,6 +123,7 @@ public class ShuffleTaskManager {
     };
     Thread thread = new Thread(clearResourceThread);
     thread.setName("clearResourceThread");
+    thread.setDaemon(true);
     thread.start();
   }
 


[incubator-uniffle] 07/17: [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit d92208ddb1edca13fcb6cb31a8980b2052f29d7b
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Thu Jun 23 15:30:19 2022 +0800

    [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)
    
    ### What changes were proposed in this pull request?
    Remove serverNode from tags structure when heartbeart timeout
    
    ### Why are the changes needed?
    Remove serverNode from tags structure when heartbeart timeout
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../com/tencent/rss/coordinator/ServerNode.java    |  7 ++++++
 .../rss/coordinator/SimpleClusterManager.java      |  9 ++++++--
 .../rss/coordinator/SimpleClusterManagerTest.java  | 27 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
index ef09298..816f080 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
@@ -115,6 +115,13 @@ public class ServerNode implements Comparable<ServerNode> {
         + ", healthy[" + isHealthy + "]";
   }
 
+  /**
+   * Only for test case
+   */
+  void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
   @Override
   public int compareTo(ServerNode other) {
     if (availableMemory > other.getAvailableMemory()) {
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index d3fe789..10af74d 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
     }
   }
 
-  private void nodesCheck() {
+  void nodesCheck() {
     try {
       long timestamp = System.currentTimeMillis();
       Set<String> deleteIds = Sets.newHashSet();
@@ -83,7 +83,12 @@ public class SimpleClusterManager implements ClusterManager {
         }
       }
       for (String serverId : deleteIds) {
-        servers.remove(serverId);
+        ServerNode sn = servers.remove(serverId);
+        if (sn != null) {
+          for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
+            nodesWithTag.remove(sn);
+          }
+        }
       }
 
       CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index a5040bf..bed9081 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -142,6 +143,32 @@ public class SimpleClusterManagerTest {
     assertEquals(0, serverNodes.size());
   }
 
+  @Test
+  public void testGetCorrectServerNodesWhenOneNodeRemoved() {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+            10, testTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+            10, testTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+            11, testTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
+    assertEquals(3, serverNodes.size());
+
+    sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
+    clusterManager.nodesCheck();
+
+    Map<String, Set<ServerNode>> tagToNodes = clusterManager.getTagToNodes();
+    List<ServerNode> serverList = clusterManager.getServerList(testTags);
+    Assertions.assertEquals(2, tagToNodes.get(testTags.iterator().next()).size());
+    Assertions.assertEquals(2, serverList.size());
+  }
+
   @Test
   public void updateExcludeNodesTest() throws Exception {
     String excludeNodesFolder = (new File(ClassLoader.getSystemResource("empty").getFile())).getParent();


[incubator-uniffle] 03/17: [Bugfix] Fix spark2 executor stop NPE problem (#186)

Posted by js...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 924dac7f093d0b3f581e521fc71bc30ea0963907
Author: roryqi <je...@gmail.com>
AuthorDate: Wed Jun 22 14:34:06 2022 +0800

    [Bugfix] Fix spark2 executor stop NPE problem (#186)
    
    ### What changes were proposed in this pull request?
    We need to judge heartbeatExecutorService whether is null when we will stop it.
    
    ### Why are the changes needed?
    #177 pr introduce this problem, when we run Spark applications on our cluster, the executor will throw NPE when method `stop` is called.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test