You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/07 14:19:27 UTC

[incubator-uniffle] branch master updated: [ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a03711 [ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)
78a03711 is described below

commit 78a037114b5ec02fb27c0e3728798b716da6a2c0
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Wed Sep 7 22:19:22 2022 +0800

    [ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)
    
    ### What changes were proposed in this pull request?
    Solve #186
    
    ### Why are the changes needed?
    A better strategy to select remote path.
    
    ### Does this PR introduce _any_ user-facing change?
    Three config added.
    1. `rss.coordinator.remote.storage.select.strategy` support `APP_BALANCE` and `IO_SAMPLE`, `APP_BALANCE` selection strategy based on the number of apps, `IO_SAMPLE` selection strategy based on time consumption of reading and writing files.
    2. `rss.coordinator.remote.storage.io.sample.schedule.time` , if user choose `IO_SAMPLE`, file will be read and written at regular intervals.
    3. `rss.coordinator.remote.storage.io.sample.file.size` , the size of each read / write HDFS file.
    4. `rss.coordinator.remote.storage.io.sample.access.times`, number of times to read and write HDFS files.
    
    ### How was this patch tested?
    Added ut.
---
 coordinator/pom.xml                                |   4 +
 .../AppBalanceSelectStorageStrategy.java           | 144 +++++++++++
 .../uniffle/coordinator/ApplicationManager.java    |  97 +++-----
 .../uniffle/coordinator/CoordinatorConf.java       |  22 +-
 .../LowestIOSampleCostSelectStorageStrategy.java   | 266 +++++++++++++++++++++
 .../uniffle/coordinator/SelectStorageStrategy.java |  40 ++++
 .../AppBalanceSelectStorageStrategyTest.java       | 156 ++++++++++++
 .../coordinator/ApplicationManagerTest.java        | 105 +-------
 .../uniffle/coordinator/ClientConfManagerTest.java | 102 +++++++-
 ...owestIOSampleCostSelectStorageStrategyTest.java | 201 ++++++++++++++++
 docs/coordinator_guide.md                          |   4 +
 .../apache/uniffle/test/FetchClientConfTest.java   |  57 ++++-
 12 files changed, 1028 insertions(+), 170 deletions(-)

diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index 686534a6..368742d3 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -73,6 +73,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-inline</artifactId>
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
new file mode 100644
index 00000000..c42da449
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
+
+/**
+ * AppBalanceSelectStorageStrategy will consider the number of apps allocated on each remote path is balanced.
+ */
+public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathCounter;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+
+  public AppBalanceSelectStorageStrategy() {
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathCounter = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+  }
+
+  /**
+   * the strategy of pick remote storage is according to assignment count
+   */
+  @Override
+  public RemoteStorageInfo pickRemoteStorage(String appId) {
+    if (appIdToRemoteStorageInfo.containsKey(appId)) {
+      return appIdToRemoteStorageInfo.get(appId);
+    }
+
+    // create list for sort
+    List<Map.Entry<String, RankValue>> sizeList =
+        Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
+            .sorted(Comparator.comparingInt(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+
+    for (Map.Entry<String, RankValue> entry : sizeList) {
+      String storagePath = entry.getKey();
+      if (availableRemoteStorageInfo.containsKey(storagePath)) {
+        appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+        incRemoteStorageCounter(storagePath);
+        break;
+      }
+    }
+    return appIdToRemoteStorageInfo.get(appId);
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+    RankValue counter = remoteStoragePathCounter.get(remoteStoragePath);
+    if (counter != null) {
+      counter.getAppNum().incrementAndGet();
+    } else {
+      // it may be happened when assignment remote storage
+      // and refresh remote storage at the same time
+      LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1",
+          remoteStoragePath);
+      remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1));
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void decRemoteStorageCounter(String storagePath) {
+    if (!StringUtils.isEmpty(storagePath)) {
+      RankValue atomic = remoteStoragePathCounter.get(storagePath);
+      if (atomic != null) {
+        double count = atomic.getAppNum().decrementAndGet();
+        if (count < 0) {
+          LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+              storagePath, count);
+          atomic.getAppNum().set(0);
+        }
+      } else {
+        LOG.warn("Can't find counter for remote storage: {}", storagePath);
+        remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0));
+      }
+      if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0
+          && !availableRemoteStorageInfo.containsKey(storagePath)) {
+        remoteStoragePathCounter.remove(storagePath);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void removePathFromCounter(String storagePath) {
+    RankValue atomic = remoteStoragePathCounter.get(storagePath);
+    if (atomic != null && atomic.getAppNum().get() == 0) {
+      remoteStoragePathCounter.remove(storagePath);
+    }
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+    return appIdToRemoteStorageInfo;
+  }
+
+  @Override
+  public Map<String, RankValue> getRemoteStoragePathRankValue() {
+    return remoteStoragePathCounter;
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+    return availableRemoteStorageInfo;
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 40a1e1ee..6fcd6b56 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -19,16 +19,12 @@ package org.apache.uniffle.coordinator;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -41,24 +37,38 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
 
 public class ApplicationManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationManager.class);
   private long expired;
+  private StrategyName storageStrategy;
   private Map<String, Long> appIds = Maps.newConcurrentMap();
+  private SelectStorageStrategy selectStorageStrategy;
   // store appId -> remote path to make sure all shuffle data of the same application
   // will be written to the same remote storage
-  private Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+  private Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
   // store remote path -> application count for assignment strategy
-  private Map<String, AtomicInteger> remoteStoragePathCounter = Maps.newConcurrentMap();
+  private Map<String, RankValue> remoteStoragePathRankValue;
   private Map<String, String> remoteStorageToHost = Maps.newConcurrentMap();
-  private Map<String, RemoteStorageInfo> availableRemoteStorageInfo = Maps.newHashMap();
+  private Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
   private ScheduledExecutorService scheduledExecutorService;
   // it's only for test case to check if status check has problem
   private boolean hasErrorInStatusCheck = false;
 
   public ApplicationManager(CoordinatorConf conf) {
+    storageStrategy = conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY);
+    if (StrategyName.IO_SAMPLE == storageStrategy) {
+      selectStorageStrategy = new LowestIOSampleCostSelectStorageStrategy(conf);
+    } else if (StrategyName.APP_BALANCE == storageStrategy) {
+      selectStorageStrategy = new AppBalanceSelectStorageStrategy();
+    } else {
+      throw new UnsupportedOperationException("Unsupported selected storage strategy.");
+    }
+    appIdToRemoteStorageInfo = selectStorageStrategy.getAppIdToRemoteStorageInfo();
+    remoteStoragePathRankValue = selectStorageStrategy.getRemoteStoragePathRankValue();
+    availableRemoteStorageInfo = selectStorageStrategy.getAvailableRemoteStorageInfo();
     expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
     // the thread for checking application status
     scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
@@ -83,7 +93,7 @@ public class ApplicationManager {
       // add remote path if not exist
       for (String path : paths) {
         if (!availableRemoteStorageInfo.containsKey(path)) {
-          remoteStoragePathCounter.putIfAbsent(path, new AtomicInteger(0));
+          remoteStoragePathRankValue.putIfAbsent(path, new RankValue(0));
           // refreshRemoteStorage is designed without multiple thread problem
           // metrics shouldn't be added duplicated
           addRemoteStorageMetrics(path);
@@ -117,67 +127,17 @@ public class ApplicationManager {
   // the strategy of pick remote storage is according to assignment count
   // todo: better strategy with workload balance
   public RemoteStorageInfo pickRemoteStorage(String appId) {
-    if (appIdToRemoteStorageInfo.containsKey(appId)) {
-      return appIdToRemoteStorageInfo.get(appId);
-    }
-
-    // create list for sort
-    List<Map.Entry<String, AtomicInteger>> sizeList =
-        Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
-            .sorted(Comparator.comparingInt(entry -> entry.getValue().get())).collect(Collectors.toList());
-
-    for (Map.Entry<String, AtomicInteger> entry : sizeList) {
-      String storagePath = entry.getKey();
-      if (availableRemoteStorageInfo.containsKey(storagePath)) {
-        appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
-        incRemoteStorageCounter(storagePath);
-        break;
-      }
-    }
+    selectStorageStrategy.pickRemoteStorage(appId);
     return appIdToRemoteStorageInfo.get(appId);
   }
 
-  @VisibleForTesting
-  protected synchronized void incRemoteStorageCounter(String remoteStoragePath) {
-    AtomicInteger counter = remoteStoragePathCounter.get(remoteStoragePath);
-    if (counter != null) {
-      counter.incrementAndGet();
-    } else {
-      // it may be happened when assignment remote storage
-      // and refresh remote storage at the same time
-      LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1",
-          remoteStoragePath);
-      remoteStoragePathCounter.put(remoteStoragePath, new AtomicInteger(1));
-    }
-  }
-
   @VisibleForTesting
   protected synchronized void decRemoteStorageCounter(String storagePath) {
-    if (!StringUtils.isEmpty(storagePath)) {
-      AtomicInteger atomic = remoteStoragePathCounter.get(storagePath);
-      if (atomic != null) {
-        int count = atomic.decrementAndGet();
-        if (count < 0) {
-          LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
-              storagePath, count);
-          atomic.set(0);
-        }
-      } else {
-        LOG.warn("Can't find counter for remote storage: {}", storagePath);
-        remoteStoragePathCounter.putIfAbsent(storagePath, new AtomicInteger(0));
-      }
-      if (remoteStoragePathCounter.get(storagePath).get() == 0
-          && !availableRemoteStorageInfo.containsKey(storagePath)) {
-        remoteStoragePathCounter.remove(storagePath);
-      }
-    }
+    selectStorageStrategy.decRemoteStorageCounter(storagePath);
   }
 
   private synchronized void removePathFromCounter(String storagePath) {
-    AtomicInteger atomic = remoteStoragePathCounter.get(storagePath);
-    if (atomic != null && atomic.get() == 0) {
-      remoteStoragePathCounter.remove(storagePath);
-    }
+    selectStorageStrategy.removePathFromCounter(storagePath);
   }
 
   public Set<String> getAppIds() {
@@ -185,13 +145,13 @@ public class ApplicationManager {
   }
 
   @VisibleForTesting
-  protected Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
-    return appIdToRemoteStorageInfo;
+  protected Map<String, RankValue> getRemoteStoragePathRankValue() {
+    return remoteStoragePathRankValue;
   }
 
   @VisibleForTesting
-  protected Map<String, AtomicInteger> getRemoteStoragePathCounter() {
-    return remoteStoragePathCounter;
+  public SelectStorageStrategy getSelectStorageStrategy() {
+    return selectStorageStrategy;
   }
 
   @VisibleForTesting
@@ -237,7 +197,7 @@ public class ApplicationManager {
       try {
         String storageHost = getStorageHost(remoteStoragePath);
         CoordinatorMetrics.updateDynamicGaugeForRemoteStorage(storageHost,
-            remoteStoragePathCounter.get(remoteStoragePath).get());
+            remoteStoragePathRankValue.get(remoteStoragePath).getAppNum().get());
       } catch (Exception e) {
         LOG.warn("Update remote storage metrics for {} failed ", remoteStoragePath);
       }
@@ -266,4 +226,9 @@ public class ApplicationManager {
     }
     return storageHost;
   }
+
+  public enum StrategyName {
+    APP_BALANCE,
+    IO_SAMPLE
+  }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 34c65b98..28931a42 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -26,6 +26,7 @@ import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.RssUtils;
 
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
 import static org.apache.uniffle.coordinator.AssignmentStrategyFactory.StrategyName.PARTITION_BALANCE;
 
 /**
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.io.sample.schedule.time")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE = ConfigOptions
+      .key("rss.coordinator.remote.storage.io.sample.file.size")
+      .intType()
+      .defaultValue(204800 * 1000)
+      .withDescription("The size of the file that the scheduled thread reads and writes");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES = ConfigOptions
+      .key("rss.coordinator.remote.storage.io.sample.access.times")
+      .intType()
+      .defaultValue(3)
+      .withDescription("The number of times to read and write HDFS files");
 
   public CoordinatorConf() {
   }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
new file mode 100644
index 00000000..96147b8e
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * LowestIOSampleCostSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class LowestIOSampleCostSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LowestIOSampleCostSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public LowestIOSampleCostSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          }
+        } catch (Exception e) {
+          LOG.error("Storage read and write error, we will not use this remote path {}.", path, e);
+          RankValue rankValue = remoteStoragePathRankValue.get(path);
+          remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+        } finally {
+          sortPathByRankValue(path, testPath, startWriteTime);
+        }
+      }
+    } else {
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByRankValue(String path, Path testPath, long startWrite) {
+    try {
+      fs.delete(testPath, true);
+      long totalTime = System.currentTimeMillis() - startWrite;
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+    } catch (Exception e) {
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+      LOG.error("Failed to sort, we will not use this remote path {}.", path, e);
+    } finally {
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(
+              entry -> entry.getValue().getReadAndWriteTime().get())).collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * the strategy of pick remote storage is based on whether the remote path can be read or written
+   */
+  @Override
+  public RemoteStorageInfo pickRemoteStorage(String appId) {
+    if (appIdToRemoteStorageInfo.containsKey(appId)) {
+      return appIdToRemoteStorageInfo.get(appId);
+    }
+
+    for (Map.Entry<String, RankValue> entry : sizeList) {
+      String storagePath = entry.getKey();
+      if (availableRemoteStorageInfo.containsKey(storagePath)) {
+        appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+        incRemoteStorageCounter(storagePath);
+        break;
+      }
+    }
+    return appIdToRemoteStorageInfo.get(appId);
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+    RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath);
+    if (counter != null) {
+      counter.getAppNum().incrementAndGet();
+    } else {
+      remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1));
+      // it may be happened when assignment remote storage
+      // and refresh remote storage at the same time
+      LOG.warn("Remote storage path lost during assignment: %s doesn't exist, "
+          + "reset the rank value to 0 and app size to 1.", remoteStoragePath);
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void decRemoteStorageCounter(String storagePath) {
+    if (!StringUtils.isEmpty(storagePath)) {
+      RankValue atomic = remoteStoragePathRankValue.get(storagePath);
+      if (atomic != null) {
+        double count = atomic.getAppNum().decrementAndGet();
+        if (count < 0) {
+          LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+              storagePath, count);
+          atomic.getAppNum().set(0);
+        }
+      } else {
+        remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1));
+        LOG.warn("Can't find counter for remote storage: {}", storagePath);
+      }
+
+      if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
+          && !availableRemoteStorageInfo.containsKey(storagePath)) {
+        remoteStoragePathRankValue.remove(storagePath);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void removePathFromCounter(String storagePath) {
+    RankValue rankValue = remoteStoragePathRankValue.get(storagePath);
+    // The time spent reading and writing cannot be used to determine whether the current path is still used by apps.
+    // Therefore, determine whether the HDFS path is still used by the number of apps
+    if (rankValue != null && rankValue.getAppNum().get() == 0) {
+      remoteStoragePathRankValue.remove(storagePath);
+    }
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+    return appIdToRemoteStorageInfo;
+  }
+
+  @Override
+  public Map<String, RankValue> getRemoteStoragePathRankValue() {
+    return remoteStoragePathRankValue;
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+    return availableRemoteStorageInfo;
+  }
+
+  @VisibleForTesting
+  public void setFs(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @VisibleForTesting
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  static class RankValue {
+    AtomicLong readAndWriteTime;
+    AtomicInteger appNum;
+
+    RankValue(int appNum) {
+      this.readAndWriteTime = new AtomicLong(0);
+      this.appNum = new AtomicInteger(appNum);
+    }
+
+    RankValue(long ratioValue, int appNum) {
+      this.readAndWriteTime = new AtomicLong(ratioValue);
+      this.appNum = new AtomicInteger(appNum);
+    }
+
+    public AtomicLong getReadAndWriteTime() {
+      return readAndWriteTime;
+    }
+
+    public AtomicInteger getAppNum() {
+      return appNum;
+    }
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
new file mode 100644
index 00000000..654ec8a0
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Map;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
+
+public interface SelectStorageStrategy {
+
+  RemoteStorageInfo pickRemoteStorage(String appId);
+
+  void incRemoteStorageCounter(String remoteStoragePath);
+
+  void decRemoteStorageCounter(String storagePath);
+
+  void removePathFromCounter(String storagePath);
+
+  Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo();
+
+  Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo();
+
+  Map<String, RankValue> getRemoteStoragePathRankValue();
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
new file mode 100644
index 00000000..d52d65aa
--- /dev/null
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.util.Constants;
+
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AppBalanceSelectStorageStrategyTest {
+
+  private AppBalanceSelectStorageStrategy appBalanceSelectStorageStrategy;
+  private ApplicationManager applicationManager;
+  private long appExpiredTime = 2000L;
+  private String remotePath1 = "hdfs://path1";
+  private String remotePath2 = "hdfs://path2";
+  private String remotePath3 = "hdfs://path3";
+
+  @BeforeAll
+  public static void setup() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterAll
+  public static void clear() {
+    CoordinatorMetrics.clear();
+  }
+
+  @BeforeEach
+  public void setUp() {
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
+    conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE);
+    applicationManager = new ApplicationManager(conf);
+    appBalanceSelectStorageStrategy = (AppBalanceSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+  }
+
+  @Test
+  public void selectStorageTest() throws Exception {
+    String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get());
+    assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+    String storageHost1 = "path1";
+    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+    String storageHost2 = "path2";
+    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+    // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call
+    appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1);
+    appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1);
+    String testApp1 = "testApp1";
+    applicationManager.refreshAppId(testApp1);
+    assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    assertEquals(remotePath2, appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
+    assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+    // return the same value if did the assignment already
+    assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+
+    Thread.sleep(appExpiredTime + 2000);
+    assertNull(appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1));
+    assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+    // refresh app1, got remotePath2, then remove remotePath2,
+    // it should be existed in counter until it expired
+    applicationManager.refreshAppId(testApp1);
+    assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    remoteStoragePath = remotePath1;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)),
+        appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+    assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+    // app1 is expired, remotePath2 is removed because of counter = 0
+    Thread.sleep(appExpiredTime + 2000);
+    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)),
+        appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+
+    // restore previous manually inc for next test case
+    appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1);
+    appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1);
+    // remove all remote storage
+    applicationManager.refreshRemoteStorage("", "");
+    assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size());
+    assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size());
+    assertFalse(applicationManager.hasErrorInStatusCheck());
+  }
+
+  @Test
+  public void storageCounterMulThreadTest() throws Exception {
+    String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2
+        + Constants.COMMA_SPLIT_CHAR + remotePath3;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    String appPrefix = "testAppId";
+
+    Thread pickThread1 = new Thread(() -> {
+      for (int i = 0; i < 1000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+
+    Thread pickThread2 = new Thread(() -> {
+      for (int i = 1000; i < 2000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+
+    Thread pickThread3 = new Thread(() -> {
+      for (int i = 2000; i < 3000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+    pickThread1.start();
+    pickThread2.start();
+    pickThread3.start();
+    pickThread1.join();
+    pickThread2.join();
+    pickThread3.join();
+    Thread.sleep(appExpiredTime + 2000);
+
+    applicationManager.refreshRemoteStorage("", "");
+    assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size());
+    assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size());
+    assertFalse(applicationManager.hasErrorInStatusCheck());
+  }
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 8cbccbc2..791ec1dc 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -31,7 +31,6 @@ import org.apache.uniffle.common.util.Constants;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ApplicationManagerTest {
@@ -66,19 +65,19 @@ public class ApplicationManagerTest {
     Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
     applicationManager.refreshRemoteStorage(remoteStoragePath, "");
     assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
-    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
 
     remoteStoragePath = remotePath3;
     expectedAvailablePath = Sets.newHashSet(remotePath3);
     applicationManager.refreshRemoteStorage(remoteStoragePath, "");
     assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
-    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
 
     remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath3;
     expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath3);
     applicationManager.refreshRemoteStorage(remoteStoragePath, remoteStorageConf);
     assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
-    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
     Map<String, RemoteStorageInfo> storages = applicationManager.getAvailableRemoteStorageInfo();
     RemoteStorageInfo remoteStorageInfo = storages.get(remotePath1);
     assertEquals(2, remoteStorageInfo.getConfItems().size());
@@ -94,59 +93,7 @@ public class ApplicationManagerTest {
     assertEquals(1, remoteStorageInfo.getConfItems().size());
     assertEquals("v3", remoteStorageInfo.getConfItems().get("k3"));
     assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
-    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
-    assertFalse(applicationManager.hasErrorInStatusCheck());
-  }
-
-  @Test
-  public void storageCounterTest() throws Exception {
-    String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2;
-    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
-    assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath1).get());
-    assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-    String storageHost1 = "path1";
-    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
-    String storageHost2 = "path2";
-    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
-
-    // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call
-    applicationManager.incRemoteStorageCounter(remotePath1);
-    applicationManager.incRemoteStorageCounter(remotePath1);
-    String testApp1 = "testApp1";
-    applicationManager.refreshAppId(testApp1);
-    assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
-    assertEquals(remotePath2, applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
-    assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-    // return the same value if did the assignment already
-    assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
-    assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-
-    Thread.sleep(appExpiredTime + 2000);
-    assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1));
-    assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
-
-    // refresh app1, got remotePath2, then remove remotePath2,
-    // it should be existed in counter until it expired
-    applicationManager.refreshAppId(testApp1);
-    assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
-    remoteStoragePath = remotePath1;
-    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
-    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)),
-        applicationManager.getRemoteStoragePathCounter().keySet());
-    assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-    // app1 is expired, remotePath2 is removed because of counter = 0
-    Thread.sleep(appExpiredTime + 2000);
-    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)),
-        applicationManager.getRemoteStoragePathCounter().keySet());
-
-    // restore previous manually inc for next test case
-    applicationManager.decRemoteStorageCounter(remotePath1);
-    applicationManager.decRemoteStorageCounter(remotePath1);
-    // remove all remote storage
-    applicationManager.refreshRemoteStorage("", "");
-    assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
-    assertEquals(0, applicationManager.getRemoteStoragePathCounter().size());
+    assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
     assertFalse(applicationManager.hasErrorInStatusCheck());
   }
 
@@ -163,48 +110,4 @@ public class ApplicationManagerTest {
     assertEquals(0, applicationManager.getAppIds().size());
     assertFalse(applicationManager.hasErrorInStatusCheck());
   }
-
-  @Test
-  public void storageCounterMulThreadTest() throws Exception {
-    String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2
-        + Constants.COMMA_SPLIT_CHAR + remotePath3;
-    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
-    String appPrefix = "testAppId";
-
-    Thread pickThread1 = new Thread(() -> {
-      for (int i = 0; i < 1000; i++) {
-        String appId = appPrefix + i;
-        applicationManager.refreshAppId(appId);
-        applicationManager.pickRemoteStorage(appId);
-      }
-    });
-
-    Thread pickThread2 = new Thread(() -> {
-      for (int i = 1000; i < 2000; i++) {
-        String appId = appPrefix + i;
-        applicationManager.refreshAppId(appId);
-        applicationManager.pickRemoteStorage(appId);
-      }
-    });
-
-    Thread pickThread3 = new Thread(() -> {
-      for (int i = 2000; i < 3000; i++) {
-        String appId = appPrefix + i;
-        applicationManager.refreshAppId(appId);
-        applicationManager.pickRemoteStorage(appId);
-      }
-    });
-    pickThread1.start();
-    pickThread2.start();
-    pickThread3.start();
-    pickThread1.join();
-    pickThread2.join();
-    pickThread3.join();
-    Thread.sleep(appExpiredTime + 2000);
-
-    applicationManager.refreshRemoteStorage("", "");
-    assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
-    assertEquals(0, applicationManager.getRemoteStoragePathCounter().size());
-    assertFalse(applicationManager.hasErrorInStatusCheck());
-  }
 }
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
index c5bfc0f2..7a618174 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator;
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.util.Map;
@@ -28,6 +29,10 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -36,6 +41,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.util.Constants;
 
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -43,6 +49,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ClientConfManagerTest {
 
+  @TempDir
+  private final File remotePath = new File("hdfs://rss");
+  private static MiniDFSCluster cluster;
+  private Configuration hdfsConf = new Configuration();
+
   @BeforeEach
   public void setUp() {
     CoordinatorMetrics.register();
@@ -53,6 +64,18 @@ public class ClientConfManagerTest {
     CoordinatorMetrics.clear();
   }
 
+  @AfterAll
+  public static void close() {
+    cluster.close();
+  }
+
+  public void createMiniHdfs(String hdfsPath) throws IOException {
+    hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+    hdfsConf.set("dfs.nameservices", "rss");
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath);
+    cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+  }
+
   @Test
   public void test(@TempDir File tempDir) throws Exception {
     File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
@@ -137,7 +160,7 @@ public class ClientConfManagerTest {
   }
 
   @Test
-  public void dynamicRemoteStorageTest() throws Exception {
+  public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception {
     int updateIntervalSec = 2;
     final String remotePath1 = "hdfs://host1/path1";
     final String remotePath2 = "hdfs://host2/path2";
@@ -194,6 +217,83 @@ public class ClientConfManagerTest {
     clientConfManager.close();
   }
 
+  @Test
+  public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
+    final int updateIntervalSec = 2;
+    final String remotePath1 = "hdfs://host1/path1";
+    final String remotePath2 = "hdfs://host2/path2";
+    final String remotePath3 = "hdfs://host3/path3";
+    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile();
+    cfgFile.deleteOnExit();
+    writeRemoteStorageConf(cfgFile, remotePath1);
+    createMiniHdfs(remotePath.getAbsolutePath());
+
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, updateIntervalSec);
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString());
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
+    conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+
+    ApplicationManager applicationManager = new ApplicationManager(conf);
+    // init IORankScheduler
+    Thread.sleep(2000);
+    LowestIOSampleCostSelectStorageStrategy selectStorageStrategy =
+        (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+    Path testPath = new Path("/test");
+    FileSystem fs = testPath.getFileSystem(hdfsConf);
+    selectStorageStrategy.setFs(fs);
+
+    final ClientConfManager clientConfManager = new ClientConfManager(conf, new Configuration(), applicationManager);
+    Thread.sleep(500);
+    Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1);
+    assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
+    selectStorageStrategy.sortPathByRankValue(remotePath1, testPath, System.currentTimeMillis());
+    RemoteStorageInfo remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId1");
+    assertEquals(remotePath1, remoteStorageInfo.getPath());
+    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+    writeRemoteStorageConf(cfgFile, remotePath3);
+    expectedAvailablePath = Sets.newHashSet(remotePath3);
+    waitForUpdate(expectedAvailablePath, applicationManager);
+    // The reason for setting the filesystem here is to trigger the execution of sortPathByRankValue
+    selectStorageStrategy.setFs(fs);
+    selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, System.currentTimeMillis());
+    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2");
+    assertEquals(remotePath3, remoteStorageInfo.getPath());
+
+    String confItems = "host2,k1=v1,k2=v2;host3,k3=v3";
+    final long current = System.currentTimeMillis();
+    writeRemoteStorageConf(cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, confItems);
+    expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3);
+    waitForUpdate(expectedAvailablePath, applicationManager);
+    selectStorageStrategy.setFs(fs);
+    selectStorageStrategy.sortPathByRankValue(remotePath2, testPath, current);
+    selectStorageStrategy.setFs(fs);
+    selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, current);
+    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3");
+    assertEquals(remotePath2, remoteStorageInfo.getPath());
+    assertEquals(2, remoteStorageInfo.getConfItems().size());
+    assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
+    assertEquals("v2", remoteStorageInfo.getConfItems().get("k2"));
+
+    confItems = "host1,keyTest1=test1,keyTest2=test2;host2,k1=deadbeaf";
+    writeRemoteStorageConf(cfgFile, remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2, confItems);
+    expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
+    waitForUpdate(expectedAvailablePath, applicationManager);
+    remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId4");
+    // one of remote storage will be chosen
+    assertTrue(
+        (remotePath1.equals(remoteStorageInfo.getPath())
+            && (remoteStorageInfo.getConfItems().size() == 2)
+            && (remoteStorageInfo.getConfItems().get("keyTest1").equals("test1")))
+            && (remoteStorageInfo.getConfItems().get("keyTest2").equals("test2"))
+            || (remotePath2.equals(remoteStorageInfo.getPath())
+            && remoteStorageInfo.getConfItems().size() == 1)
+            && remoteStorageInfo.getConfItems().get("k1").equals("deadbeaf"));
+
+    clientConfManager.close();
+  }
+
   private void writeRemoteStorageConf(File cfgFile, String value) throws Exception {
     writeRemoteStorageConf(cfgFile, value, null);
   }
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
new file mode 100644
index 00000000..41eec3e7
--- /dev/null
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.File;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.common.util.Constants;
+
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LowestIOSampleCostSelectStorageStrategyTest {
+
+  private LowestIOSampleCostSelectStorageStrategy selectStorageStrategy;
+  private ApplicationManager applicationManager;
+  private static final Configuration hdfsConf = new Configuration();
+  private static MiniDFSCluster cluster;
+  private final long appExpiredTime = 2000L;
+  private final String remoteStorage1 = "hdfs://p1";
+  private final String remoteStorage2 = "hdfs://p2";
+  private final String remoteStorage3 = "hdfs://p3";
+  private final Path testFile = new Path("test");
+
+  @TempDir
+  private static File remotePath = new File("hdfs://rss");
+
+  @BeforeAll
+  public static void setup() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterAll
+  public static void clear() {
+    CoordinatorMetrics.clear();
+    cluster.close();
+  }
+
+  @BeforeEach
+  public void init() throws Exception {
+    setUpHdfs(remotePath.getAbsolutePath());
+  }
+
+  public void setUpHdfs(String hdfsPath) throws Exception {
+    hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+    hdfsConf.set("dfs.nameservices", "rss");
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath);
+    cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+    Thread.sleep(500L);
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
+    conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 5000);
+    conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+    applicationManager = new ApplicationManager(conf);
+    selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+    selectStorageStrategy.setConf(hdfsConf);
+    Thread.sleep(1000);
+  }
+
+  @Test
+  public void selectStorageTest() throws Exception {
+    FileSystem fs = testFile.getFileSystem(hdfsConf);
+    selectStorageStrategy.setFs(fs);
+
+    String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    //default value is 0
+    assertEquals(0,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage1).getReadAndWriteTime().get());
+    assertEquals(0,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getReadAndWriteTime().get());
+    String storageHost1 = "p1";
+    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+    String storageHost2 = "p2";
+    assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+    // compare with two remote path
+    selectStorageStrategy.incRemoteStorageCounter(remoteStorage1);
+    selectStorageStrategy.incRemoteStorageCounter(remoteStorage1);
+    String testApp1 = "testApp1";
+    final long current = System.currentTimeMillis();
+    applicationManager.refreshAppId(testApp1);
+    fs.create(testFile);
+    selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, current);
+    fs.create(testFile);
+    selectStorageStrategy.sortPathByRankValue(remoteStorage1, testFile, current);
+    assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    assertEquals(remoteStorage2, selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
+    assertEquals(1,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+    // return the same value if did the assignment already
+    assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    assertEquals(1,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+
+    // when the expiration time is reached, the app was removed
+    Thread.sleep(appExpiredTime + 2000);
+    assertNull(selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1));
+    assertEquals(0,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+
+    // refresh app1, got remotePath2, then remove remotePath2,
+    // it should be existed in counter until it expired
+    applicationManager.refreshAppId(testApp1);
+    assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+    remoteStoragePath = remoteStorage1;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1, remoteStorage2)),
+        selectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+    assertTrue(selectStorageStrategy.getRemoteStoragePathRankValue()
+        .get(remoteStorage2).getReadAndWriteTime().get() > 0);
+    assertEquals(1,
+        selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+    // app1 is expired, p2 is removed because of counter = 0
+    Thread.sleep(appExpiredTime + 2000);
+    assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1)),
+        selectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+    // restore previous manually inc for next test case
+    selectStorageStrategy.decRemoteStorageCounter(remoteStorage1);
+    selectStorageStrategy.decRemoteStorageCounter(remoteStorage1);
+    // remove all remote storage
+    applicationManager.refreshRemoteStorage("", "");
+    assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size());
+    assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size());
+    assertFalse(applicationManager.hasErrorInStatusCheck());
+  }
+
+  @Test
+  public void selectStorageMulThreadTest() throws Exception {
+    FileSystem fs = testFile.getFileSystem(hdfsConf);
+    selectStorageStrategy.setFs(fs);
+    String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2
+        + Constants.COMMA_SPLIT_CHAR + remoteStorage3;
+    applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+    String appPrefix = "testAppId";
+
+    Thread pickThread1 = new Thread(() -> {
+      for (int i = 0; i < 1000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        selectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+
+    Thread pickThread2 = new Thread(() -> {
+      for (int i = 1000; i < 2000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        selectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+
+    Thread pickThread3 = new Thread(() -> {
+      for (int i = 2000; i < 3000; i++) {
+        String appId = appPrefix + i;
+        applicationManager.refreshAppId(appId);
+        selectStorageStrategy.pickRemoteStorage(appId);
+      }
+    });
+    pickThread1.start();
+    pickThread2.start();
+    pickThread3.start();
+    pickThread1.join();
+    pickThread2.join();
+    pickThread3.join();
+    Thread.sleep(appExpiredTime + 2000);
+
+    applicationManager.refreshRemoteStorage("", "");
+    assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size());
+    assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size());
+    assertFalse(applicationManager.hasErrorInStatusCheck());
+  }
+}
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index cd71326d..f9c263a2 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -95,6 +95,10 @@ This document will introduce how to deploy Uniffle coordinators.
 |rss.coordinator.remote.storage.cluster.conf|-|Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'|
 |rss.rpc.server.port|-|RPC port for coordinator|
 |rss.jetty.http.port|-|Http port for coordinator|
+|rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for selecting the remote path|
+|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of scheduling the read and write time of the paths to obtain different HDFS|
+|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the file that the scheduled thread reads and writes|
+|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times to read and write HDFS files|
 
 ### AccessClusterLoadChecker settings
 |Property Name|Default|	Description|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index c93723c6..73450d7d 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -39,6 +39,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -92,10 +93,11 @@ public class FetchClientConfTest extends CoordinatorTestBase {
     response = coordinatorClient.fetchClientConf(request);
     assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
     assertEquals(0, response.getClientConf().size());
+    shutdownServers();
   }
 
   @Test
-  public void testFetchRemoteStorage(@TempDir File tempDir) throws Exception {
+  public void testFetchRemoteStorageByApp(@TempDir File tempDir) throws Exception {
     String remotePath1 = "hdfs://path1";
     File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
     String contItem = "path2,key1=test1,key2=test2";
@@ -139,6 +141,59 @@ public class FetchClientConfTest extends CoordinatorTestBase {
     assertEquals(2, remoteStorageInfo.getConfItems().size());
     assertEquals("test1", remoteStorageInfo.getConfItems().get("key1"));
     assertEquals("test2", remoteStorageInfo.getConfItems().get("key2"));
+    shutdownServers();
+  }
+
+  @Test
+  public void testFetchRemoteStorageByIO(@TempDir File tempDir) throws Exception {
+    String remotePath1 = "hdfs://path1";
+    File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
+    String contItem = "path2,key1=test1,key2=test2";
+    Map<String, String> dynamicConf = Maps.newHashMap();
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath1);
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF.key(), contItem);
+    writeRemoteStorageConf(cfgFile, dynamicConf);
+
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
+    coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString());
+    coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 2);
+    coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 500);
+    coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+    createCoordinatorServer(coordinatorConf);
+    startServers();
+
+    waitForUpdate(Sets.newHashSet(remotePath1), coordinators.get(0).getApplicationManager());
+    String appId = "testFetchRemoteStorageApp";
+    RssFetchRemoteStorageRequest request = new RssFetchRemoteStorageRequest(appId);
+    RssFetchRemoteStorageResponse response = coordinatorClient.fetchRemoteStorage(request);
+    RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
+    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+    assertEquals(remotePath1, remoteStorageInfo.getPath());
+
+    // update remote storage info
+    String remotePath2 = "hdfs://path2";
+    dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath2);
+    writeRemoteStorageConf(cfgFile, dynamicConf);
+    waitForUpdate(Sets.newHashSet(remotePath2), coordinators.get(0).getApplicationManager());
+    request = new RssFetchRemoteStorageRequest(appId);
+    response = coordinatorClient.fetchRemoteStorage(request);
+    // remotePath1 will be return because (appId -> remote storage path) is in cache
+    remoteStorageInfo = response.getRemoteStorageInfo();
+    assertEquals(remotePath1, remoteStorageInfo.getPath());
+    assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+    // ensure sizeList can be updated
+    Thread.sleep(2000);
+    request = new RssFetchRemoteStorageRequest(appId + "another");
+    response = coordinatorClient.fetchRemoteStorage(request);
+    // got the remotePath2 for new appId
+    remoteStorageInfo = response.getRemoteStorageInfo();
+    assertEquals(remotePath2, remoteStorageInfo.getPath());
+    assertEquals(2, remoteStorageInfo.getConfItems().size());
+    assertEquals("test1", remoteStorageInfo.getConfItems().get("key1"));
+    assertEquals("test2", remoteStorageInfo.getConfItems().get("key2"));
+    shutdownServers();
   }
 
   private void waitForUpdate(