You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/10/28 07:37:15 UTC

[incubator-uniffle] branch master updated: [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d4428ed [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)
7d4428ed is described below

commit 7d4428ed22055f5f76db7b9e289945d276ddf716
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Oct 28 15:37:09 2022 +0800

    [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)
    
    What changes were proposed in this pull request?
    Write to hdfs when local disk can't be write
    
    Why are the changes needed?
    There should be a fallback mechanism when disk can't be write. #163
    
    Does this PR introduce any user-facing change?
    No
    
    How was this patch tested?
    Already added
---
 docs/server_guide.md                               |   1 +
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   4 +-
 .../apache/uniffle/server/ShuffleFlushManager.java |  24 ++--
 .../apache/uniffle/server/ShuffleServerConf.java   |  12 ++
 .../AbstractStorageManagerFallbackStrategy.java    |  65 +++++++++++
 .../HdfsStorageManagerFallbackStrategy.java        |  44 +++++++
 .../LocalStorageManagerFallbackStrategy.java       |  44 +++++++
 .../server/storage/MultiStorageManager.java        |  83 +++++++++++---
 .../RotateStorageManagerFallbackStrategy.java      |  39 +++++++
 .../server/storage/SingleStorageManager.java       |  16 +++
 .../uniffle/server/storage/StorageManager.java     |   2 +
 .../server/storage/MultiStorageManagerTest.java    |  25 ++++
 .../StorageManagerFallbackStrategyTest.java        | 126 +++++++++++++++++++++
 13 files changed, 454 insertions(+), 31 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index 5e2a97ec..efdf767a 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -82,3 +82,4 @@ This document will introduce how to deploy Uniffle shuffle servers.
 |rss.server.single.buffer.flush.enabled|false|Whether single buffer flush when size exceeded rss.server.single.buffer.flush.threshold|
 |rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle buffer flush|
 |rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If it's negative, it will use the default disk whole space|
+|rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support `org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy` and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` will be used.|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 66eb4513..be51772a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -63,6 +63,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerGrpcMetrics;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -369,7 +370,8 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
         EVENT_THRESHOLD_SIZE + 1, null, null, null);
     try {
       // can't find storage info with appId2
-      shuffleServers.get(0).getStorageManager().selectStorage(event2).getStoragePath();
+      ((MultiStorageManager)shuffleServers.get(0).getStorageManager()).getColdStorageManager()
+          .selectStorage(event2).getStoragePath();
       fail("Exception should be thrown with un-register appId");
     } catch (Exception e) {
       // expected exception, ignore
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 6c6ece9e..3c2a190a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -166,18 +166,18 @@ public class ShuffleFlushManager {
               shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
               StringUtils.EMPTY
           );
-          ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new CreateShuffleWriteHandlerRequest(
-              storageType,
-              event.getAppId(),
-              event.getShuffleId(),
-              event.getStartPartition(),
-              event.getEndPartition(),
-              storageBasePaths.toArray(new String[storageBasePaths.size()]),
-              shuffleServerId,
-              hadoopConf,
-              storageDataReplica,
-              user)
-          );
+          CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
+                  storageType,
+                  event.getAppId(),
+                  event.getShuffleId(),
+                  event.getStartPartition(),
+                  event.getEndPartition(),
+                  storageBasePaths.toArray(new String[storageBasePaths.size()]),
+                  shuffleServerId,
+                  hadoopConf,
+                  storageDataReplica,
+                  user);
+          ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
           do {
             if (event.getRetryTimes() > retryMax) {
               LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 9e580842..b8bdf7ac 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -263,6 +263,12 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(64L * 1024L * 1024L)
       .withDescription("For multistorage, the event size exceed this value, flush data  to cold storage");
 
+  public static final ConfigOption<String> MULTISTORAGE_FALLBACK_STRATEGY_CLASS = ConfigOptions
+      .key("rss.server.multistorage.fallback.strategy.class")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("For multistorage, fallback strategy class");
+
   public static final ConfigOption<Long> FALLBACK_MAX_FAIL_TIMES = ConfigOptions
       .key("rss.server.multistorage.fallback.max.fail.times")
       .longType()
@@ -296,6 +302,12 @@ public class ShuffleServerConf extends RssBaseConf {
         .defaultValue(64 * 1024 * 1024L)
         .withDescription("The threshold of single shuffle buffer flush");
 
+  public static final ConfigOption<Long> STORAGEMANAGER_CACHE_TIMEOUT = ConfigOptions
+      .key("rss.server.multistorage.storagemanager.cache.timeout")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The timeout of the cache which record the mapping information");
+
   public ShuffleServerConf() {
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..e07080bb
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public abstract class AbstractStorageManagerFallbackStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractStorageManagerFallbackStrategy.class);
+  protected ShuffleServerConf conf;
+
+  public AbstractStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+    this.conf = conf;
+  }
+  
+  public abstract StorageManager tryFallback(
+      StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates);
+
+  protected StorageManager findNextStorageManager(
+      StorageManager current, Set<Class<? extends StorageManager>> excludeTypes,
+      ShuffleDataFlushEvent event, StorageManager... candidates) {
+    int nextIdx = -1;
+    for (int i = 0; i < candidates.length; i++) {
+      if (current == candidates[i]) {
+        nextIdx = (i + 1) % candidates.length;
+        break;
+      }
+    }
+    if (nextIdx == -1) {
+      throw new RuntimeException("Current StorageManager is not in candidates");
+    }
+    for (int i = 0; i < candidates.length - 1; i++) {
+      StorageManager storageManager = candidates[(i + nextIdx) % candidates.length];
+      if (excludeTypes != null && excludeTypes.contains(storageManager.getClass())) {
+        continue;
+      }
+      if (!storageManager.canWrite(event)) {
+        continue;
+      }
+      return storageManager;
+    }
+    LOG.warn("Find next storageManager failed, all candidates are not available.");
+    return current;
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..5f92bb6f
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+
+public class HdfsStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+  private final Long fallBackTimes;
+  private Set<Class<? extends StorageManager>> excludeTypes = Sets.newHashSet(HdfsStorageManager.class);
+
+  public HdfsStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+    super(conf);
+    fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+  }
+
+  @Override
+  public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+    if (event.getRetryTimes() > fallBackTimes) {
+      return findNextStorageManager(current, excludeTypes, event, candidates);
+    }
+    return current;
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..d46111b9
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+
+public class LocalStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+  private final Long fallBackTimes;
+  private Set<Class<? extends StorageManager>> excludeTypes = Sets.newHashSet(LocalStorageManager.class);
+
+  public LocalStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+    super(conf);
+    fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+  }
+  
+  @Override
+  public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+    if (event.getRetryTimes() > fallBackTimes) {
+      return findNextStorageManager(current, excludeTypes, event, candidates);
+    }
+    return current;
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 2699a3f9..942ae5bb 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -18,7 +18,11 @@
 package org.apache.uniffle.server.storage;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +43,39 @@ public class MultiStorageManager implements StorageManager {
   private final StorageManager warmStorageManager;
   private final StorageManager coldStorageManager;
   private final long flushColdStorageThresholdSize;
-  private final long fallBackTimes;
+  private AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy;
+  private Cache<ShuffleDataFlushEvent, StorageManager> storageManagerCache;
 
   MultiStorageManager(ShuffleServerConf conf) {
     warmStorageManager = new LocalStorageManager(conf);
     coldStorageManager = new HdfsStorageManager(conf);
-    fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
     flushColdStorageThresholdSize = conf.getSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+    try {
+      storageManagerFallbackStrategy = loadFallbackStrategy(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Load fallback strategy failed.", e);
+    }
+    long cacheTimeout = conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
+    storageManagerCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
+        .build();
+  }
+
+  public static AbstractStorageManagerFallbackStrategy loadFallbackStrategy(
+      ShuffleServerConf conf) throws Exception {
+    String name = conf.getString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+        HdfsStorageManagerFallbackStrategy.class.getCanonicalName());
+    Class<?> klass = Class.forName(name);
+    Constructor<?> constructor;
+    AbstractStorageManagerFallbackStrategy instance;
+    try {
+      constructor = klass.getConstructor(conf.getClass(), Boolean.TYPE);
+      instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
+    } catch (NoSuchMethodException e) {
+      constructor = klass.getConstructor(conf.getClass());
+      instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
+    }
+    return instance;
   }
 
   @Override
@@ -65,39 +95,52 @@ public class MultiStorageManager implements StorageManager {
 
   @Override
   public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
-    selectStorageManager(event).updateWriteMetrics(event, writeTime);
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event) {
     StorageManager storageManager = selectStorageManager(event);
-    if (storageManager == coldStorageManager && event.getRetryTimes() > fallBackTimes) {
+    if (event.getRetryTimes() > 0) {
       try {
-        CreateShuffleWriteHandlerRequest request = storage.getCreateWriterHandlerRequest(
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition());
-        if (request == null) {
-          return false;
+        StorageManager newStorageManager = storageManagerFallbackStrategy.tryFallback(
+                storageManager, event, warmStorageManager, coldStorageManager);
+        if (newStorageManager != storageManager) {
+          storageManager = newStorageManager;
+          storageManagerCache.put(event, storageManager);
+          CreateShuffleWriteHandlerRequest request = storage.getCreateWriterHandlerRequest(
+              event.getAppId(), event.getShuffleId(), event.getStartPartition());
+          storage = storageManager.selectStorage(event);
+          handler = storage.getOrCreateWriteHandler(request);
         }
-        storage = warmStorageManager.selectStorage(event);
-        handler = storage.getOrCreateWriteHandler(request);
       } catch (IOException ioe) {
         LOG.warn("Create fallback write handler failed ", ioe);
-        return false;
       }
-      return warmStorageManager.write(storage, handler, event);
-    } else {
-      return storageManager.write(storage, handler, event);
     }
+    boolean success = storageManager.write(storage, handler, event);
+    if (success) {
+      storageManagerCache.invalidate(event);
+    }
+    return success;
   }
 
   private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
+    StorageManager storageManager = storageManagerCache.getIfPresent(event);
+    if (storageManager != null) {
+      return storageManager;
+    }
     if (event.getSize() > flushColdStorageThresholdSize) {
-      return coldStorageManager;
+      storageManager = coldStorageManager;
     } else {
-      return warmStorageManager;
+      storageManager = warmStorageManager;
+    }
+
+    if (!storageManager.canWrite(event)) {
+      storageManager = storageManagerFallbackStrategy.tryFallback(
+          storageManager, event, warmStorageManager, coldStorageManager);
     }
+    storageManagerCache.put(event, storageManager);
+    return storageManager;
   }
 
   public void start() {
@@ -112,6 +155,10 @@ public class MultiStorageManager implements StorageManager {
   }
 
   @Override
+  public boolean canWrite(ShuffleDataFlushEvent event) {
+    return warmStorageManager.canWrite(event) || coldStorageManager.canWrite(event);
+  }
+
   public void removeResources(PurgeEvent event) {
     LOG.info("Start to remove resource of {}", event);
     warmStorageManager.removeResources(event);
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..b7eea237
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public class RotateStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+  private final Long fallBackTimes;
+
+  public RotateStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+    super(conf);
+    fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+  }
+
+  @Override
+  public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+    if (fallBackTimes > 0
+        && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % fallBackTimes > 0)) {
+      return current;
+    }
+    return findNextStorageManager(current, null, event, candidates);
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 299b6a4c..6219cb54 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -113,6 +113,22 @@ public abstract class SingleStorageManager implements StorageManager {
     }
   }
 
+
+  @Override
+  public boolean canWrite(ShuffleDataFlushEvent event) {
+    try {
+      Storage storage = selectStorage(event);
+      // if storage is null, appId may not be registered
+      if (storage == null || !storage.canWrite()) {
+        return false;
+      }
+      return true;
+    } catch (Exception e) {
+      LOG.warn("Exception happened when select storage", e);
+      return false;
+    }
+  }
+  
   public StorageWriteMetrics createStorageWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
     long length = 0;
     long blockNum = 0;
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 0960ca37..a46bea29 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -47,6 +47,8 @@ public interface StorageManager {
   void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo);
 
   Checker getStorageChecker();
+  
+  boolean canWrite(ShuffleDataFlushEvent event);
 
   // todo: add an interface that check storage isHealthy
 }
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index e62fb18c..1c6075aa 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.common.HdfsStorage;
 import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -54,4 +55,28 @@ public class MultiStorageManagerTest {
         1, appId, 1, 1, 1, 1000000, blocks, null, null);
     assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
   }
+
+  @Test
+  public void selectStorageManagerIfCanNotWriteTest() {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+    conf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS, 
+        RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+    MultiStorageManager manager = new MultiStorageManager(conf);
+    String remoteStorage = "test";
+    String appId = "selectStorageManagerIfCanNotWriteTest_appId";
+    manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 1000, blocks, null, null);
+    Storage storage = manager.selectStorage(event);
+    assertTrue((storage instanceof LocalStorage));
+    ((LocalStorage)storage).markCorrupted();
+    event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 1000, blocks, null, null);
+    assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
+  }
 }
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java b/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java
new file mode 100644
index 00000000..c6e37a26
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StorageManagerFallbackStrategyTest {
+  private ShuffleServerConf conf;
+  
+  @BeforeEach
+  public void prepare() {
+    conf = new ShuffleServerConf();
+    conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+  }
+
+  @Test
+  public void testDefaultFallbackStrategy() {
+    RotateStorageManagerFallbackStrategy fallbackStrategy = new RotateStorageManagerFallbackStrategy(conf);
+    LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+    HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+    StorageManager current = warmStorageManager;
+    String remoteStorage = "test";
+    String appId = "testDefaultFallbackStrategy_appId";
+    coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 1000, blocks, null, null);
+    event.increaseRetryTimes();
+    StorageManager storageManager = fallbackStrategy.tryFallback(
+        current, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == coldStorageManager);
+
+    conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 3);
+    fallbackStrategy = new RotateStorageManagerFallbackStrategy(conf);
+    storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == warmStorageManager);
+    for (int i = 0; i < 2; i++) {
+      event.increaseRetryTimes();
+    }
+    storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == coldStorageManager);
+    event.increaseRetryTimes();
+    storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == warmStorageManager);
+    for (int i = 0; i < 2; i++) {
+      event.increaseRetryTimes();
+    }
+    storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == warmStorageManager);
+  }
+
+  @Test
+  public void testHdfsFallbackStrategy() {
+    HdfsStorageManagerFallbackStrategy fallbackStrategy = new HdfsStorageManagerFallbackStrategy(conf);
+    LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+    HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+    String remoteStorage = "test";
+    String appId = "testHdfsFallbackStrategy_appId";
+    coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 1000, blocks, null, null);
+    event.increaseRetryTimes();
+    StorageManager storageManager = fallbackStrategy.tryFallback(
+        warmStorageManager, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == warmStorageManager);
+
+    storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == warmStorageManager);
+  }
+
+
+  @Test
+  public void testLocalFallbackStrategy() {
+    LocalStorageManagerFallbackStrategy fallbackStrategy = new LocalStorageManagerFallbackStrategy(conf);
+    LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+    HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+    String remoteStorage = "test";
+    String appId = "testLocalFallbackStrategy_appId";
+    coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+    List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+        new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+    ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+        1, appId, 1, 1, 1, 1000, blocks, null, null);
+    event.increaseRetryTimes();
+    StorageManager storageManager = fallbackStrategy.tryFallback(
+        warmStorageManager, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == coldStorageManager);
+
+    storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+    assertTrue(storageManager == coldStorageManager);
+  }
+}