You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/10/28 07:37:15 UTC
[incubator-uniffle] branch master updated: [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7d4428ed [ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)
7d4428ed is described below
commit 7d4428ed22055f5f76db7b9e289945d276ddf716
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Oct 28 15:37:09 2022 +0800
[ISSUE-163][FEATURE] Write to hdfs when local disk can't be write (#235)
What changes were proposed in this pull request?
Write to hdfs when local disk can't be write
Why are the changes needed?
There should be a fallback mechanism when disk can't be write. #163
Does this PR introduce any user-facing change?
No
How was this patch tested?
Already added
---
docs/server_guide.md | 1 +
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 4 +-
.../apache/uniffle/server/ShuffleFlushManager.java | 24 ++--
.../apache/uniffle/server/ShuffleServerConf.java | 12 ++
.../AbstractStorageManagerFallbackStrategy.java | 65 +++++++++++
.../HdfsStorageManagerFallbackStrategy.java | 44 +++++++
.../LocalStorageManagerFallbackStrategy.java | 44 +++++++
.../server/storage/MultiStorageManager.java | 83 +++++++++++---
.../RotateStorageManagerFallbackStrategy.java | 39 +++++++
.../server/storage/SingleStorageManager.java | 16 +++
.../uniffle/server/storage/StorageManager.java | 2 +
.../server/storage/MultiStorageManagerTest.java | 25 ++++
.../StorageManagerFallbackStrategyTest.java | 126 +++++++++++++++++++++
13 files changed, 454 insertions(+), 31 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 5e2a97ec..efdf767a 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -82,3 +82,4 @@ This document will introduce how to deploy Uniffle shuffle servers.
|rss.server.single.buffer.flush.enabled|false|Whether single buffer flush when size exceeded rss.server.single.buffer.flush.threshold|
|rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle buffer flush|
|rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If it's negative, it will use the default disk whole space|
+|rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support `org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy` and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` will be used.|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 66eb4513..be51772a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -63,6 +63,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerGrpcMetrics;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -369,7 +370,8 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
EVENT_THRESHOLD_SIZE + 1, null, null, null);
try {
// can't find storage info with appId2
- shuffleServers.get(0).getStorageManager().selectStorage(event2).getStoragePath();
+ ((MultiStorageManager)shuffleServers.get(0).getStorageManager()).getColdStorageManager()
+ .selectStorage(event2).getStoragePath();
fail("Exception should be thrown with un-register appId");
} catch (Exception e) {
// expected exception, ignore
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 6c6ece9e..3c2a190a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -166,18 +166,18 @@ public class ShuffleFlushManager {
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
StringUtils.EMPTY
);
- ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new CreateShuffleWriteHandlerRequest(
- storageType,
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition(),
- event.getEndPartition(),
- storageBasePaths.toArray(new String[storageBasePaths.size()]),
- shuffleServerId,
- hadoopConf,
- storageDataReplica,
- user)
- );
+ CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
+ storageType,
+ event.getAppId(),
+ event.getShuffleId(),
+ event.getStartPartition(),
+ event.getEndPartition(),
+ storageBasePaths.toArray(new String[storageBasePaths.size()]),
+ shuffleServerId,
+ hadoopConf,
+ storageDataReplica,
+ user);
+ ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
do {
if (event.getRetryTimes() > retryMax) {
LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 9e580842..b8bdf7ac 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -263,6 +263,12 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(64L * 1024L * 1024L)
.withDescription("For multistorage, the event size exceed this value, flush data to cold storage");
+ public static final ConfigOption<String> MULTISTORAGE_FALLBACK_STRATEGY_CLASS = ConfigOptions
+ .key("rss.server.multistorage.fallback.strategy.class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("For multistorage, fallback strategy class");
+
public static final ConfigOption<Long> FALLBACK_MAX_FAIL_TIMES = ConfigOptions
.key("rss.server.multistorage.fallback.max.fail.times")
.longType()
@@ -296,6 +302,12 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(64 * 1024 * 1024L)
.withDescription("The threshold of single shuffle buffer flush");
+ public static final ConfigOption<Long> STORAGEMANAGER_CACHE_TIMEOUT = ConfigOptions
+ .key("rss.server.multistorage.storagemanager.cache.timeout")
+ .longType()
+ .defaultValue(60 * 1000L)
+ .withDescription("The timeout of the cache which record the mapping information");
+
public ShuffleServerConf() {
}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..e07080bb
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public abstract class AbstractStorageManagerFallbackStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractStorageManagerFallbackStrategy.class);
+ protected ShuffleServerConf conf;
+
+ public AbstractStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+ this.conf = conf;
+ }
+
+ public abstract StorageManager tryFallback(
+ StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates);
+
+ protected StorageManager findNextStorageManager(
+ StorageManager current, Set<Class<? extends StorageManager>> excludeTypes,
+ ShuffleDataFlushEvent event, StorageManager... candidates) {
+ int nextIdx = -1;
+ for (int i = 0; i < candidates.length; i++) {
+ if (current == candidates[i]) {
+ nextIdx = (i + 1) % candidates.length;
+ break;
+ }
+ }
+ if (nextIdx == -1) {
+ throw new RuntimeException("Current StorageManager is not in candidates");
+ }
+ for (int i = 0; i < candidates.length - 1; i++) {
+ StorageManager storageManager = candidates[(i + nextIdx) % candidates.length];
+ if (excludeTypes != null && excludeTypes.contains(storageManager.getClass())) {
+ continue;
+ }
+ if (!storageManager.canWrite(event)) {
+ continue;
+ }
+ return storageManager;
+ }
+ LOG.warn("Find next storageManager failed, all candidates are not available.");
+ return current;
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..5f92bb6f
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManagerFallbackStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+
+public class HdfsStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+ private final Long fallBackTimes;
+ private Set<Class<? extends StorageManager>> excludeTypes = Sets.newHashSet(HdfsStorageManager.class);
+
+ public HdfsStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+ super(conf);
+ fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+ }
+
+ @Override
+ public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+ if (event.getRetryTimes() > fallBackTimes) {
+ return findNextStorageManager(current, excludeTypes, event, candidates);
+ }
+ return current;
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..d46111b9
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+
+public class LocalStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+ private final Long fallBackTimes;
+ private Set<Class<? extends StorageManager>> excludeTypes = Sets.newHashSet(LocalStorageManager.class);
+
+ public LocalStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+ super(conf);
+ fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+ }
+
+ @Override
+ public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+ if (event.getRetryTimes() > fallBackTimes) {
+ return findNextStorageManager(current, excludeTypes, event, candidates);
+ }
+ return current;
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 2699a3f9..942ae5bb 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -18,7 +18,11 @@
package org.apache.uniffle.server.storage;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,13 +43,39 @@ public class MultiStorageManager implements StorageManager {
private final StorageManager warmStorageManager;
private final StorageManager coldStorageManager;
private final long flushColdStorageThresholdSize;
- private final long fallBackTimes;
+ private AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy;
+ private Cache<ShuffleDataFlushEvent, StorageManager> storageManagerCache;
MultiStorageManager(ShuffleServerConf conf) {
warmStorageManager = new LocalStorageManager(conf);
coldStorageManager = new HdfsStorageManager(conf);
- fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
flushColdStorageThresholdSize = conf.getSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE);
+ try {
+ storageManagerFallbackStrategy = loadFallbackStrategy(conf);
+ } catch (Exception e) {
+ throw new RuntimeException("Load fallback strategy failed.", e);
+ }
+ long cacheTimeout = conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
+ storageManagerCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ public static AbstractStorageManagerFallbackStrategy loadFallbackStrategy(
+ ShuffleServerConf conf) throws Exception {
+ String name = conf.getString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+ HdfsStorageManagerFallbackStrategy.class.getCanonicalName());
+ Class<?> klass = Class.forName(name);
+ Constructor<?> constructor;
+ AbstractStorageManagerFallbackStrategy instance;
+ try {
+ constructor = klass.getConstructor(conf.getClass(), Boolean.TYPE);
+ instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
+ } catch (NoSuchMethodException e) {
+ constructor = klass.getConstructor(conf.getClass());
+ instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
+ }
+ return instance;
}
@Override
@@ -65,39 +95,52 @@ public class MultiStorageManager implements StorageManager {
@Override
public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
- selectStorageManager(event).updateWriteMetrics(event, writeTime);
+ throw new UnsupportedOperationException();
}
@Override
public boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event) {
StorageManager storageManager = selectStorageManager(event);
- if (storageManager == coldStorageManager && event.getRetryTimes() > fallBackTimes) {
+ if (event.getRetryTimes() > 0) {
try {
- CreateShuffleWriteHandlerRequest request = storage.getCreateWriterHandlerRequest(
- event.getAppId(),
- event.getShuffleId(),
- event.getStartPartition());
- if (request == null) {
- return false;
+ StorageManager newStorageManager = storageManagerFallbackStrategy.tryFallback(
+ storageManager, event, warmStorageManager, coldStorageManager);
+ if (newStorageManager != storageManager) {
+ storageManager = newStorageManager;
+ storageManagerCache.put(event, storageManager);
+ CreateShuffleWriteHandlerRequest request = storage.getCreateWriterHandlerRequest(
+ event.getAppId(), event.getShuffleId(), event.getStartPartition());
+ storage = storageManager.selectStorage(event);
+ handler = storage.getOrCreateWriteHandler(request);
}
- storage = warmStorageManager.selectStorage(event);
- handler = storage.getOrCreateWriteHandler(request);
} catch (IOException ioe) {
LOG.warn("Create fallback write handler failed ", ioe);
- return false;
}
- return warmStorageManager.write(storage, handler, event);
- } else {
- return storageManager.write(storage, handler, event);
}
+ boolean success = storageManager.write(storage, handler, event);
+ if (success) {
+ storageManagerCache.invalidate(event);
+ }
+ return success;
}
private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
+ StorageManager storageManager = storageManagerCache.getIfPresent(event);
+ if (storageManager != null) {
+ return storageManager;
+ }
if (event.getSize() > flushColdStorageThresholdSize) {
- return coldStorageManager;
+ storageManager = coldStorageManager;
} else {
- return warmStorageManager;
+ storageManager = warmStorageManager;
+ }
+
+ if (!storageManager.canWrite(event)) {
+ storageManager = storageManagerFallbackStrategy.tryFallback(
+ storageManager, event, warmStorageManager, coldStorageManager);
}
+ storageManagerCache.put(event, storageManager);
+ return storageManager;
}
public void start() {
@@ -112,6 +155,10 @@ public class MultiStorageManager implements StorageManager {
}
@Override
+ public boolean canWrite(ShuffleDataFlushEvent event) {
+ return warmStorageManager.canWrite(event) || coldStorageManager.canWrite(event);
+ }
+
public void removeResources(PurgeEvent event) {
LOG.info("Start to remove resource of {}", event);
warmStorageManager.removeResources(event);
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
new file mode 100644
index 00000000..b7eea237
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+public class RotateStorageManagerFallbackStrategy extends AbstractStorageManagerFallbackStrategy {
+ private final Long fallBackTimes;
+
+ public RotateStorageManagerFallbackStrategy(ShuffleServerConf conf) {
+ super(conf);
+ fallBackTimes = conf.get(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES);
+ }
+
+ @Override
+ public StorageManager tryFallback(StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) {
+ if (fallBackTimes > 0
+ && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % fallBackTimes > 0)) {
+ return current;
+ }
+ return findNextStorageManager(current, null, event, candidates);
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 299b6a4c..6219cb54 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -113,6 +113,22 @@ public abstract class SingleStorageManager implements StorageManager {
}
}
+
+ @Override
+ public boolean canWrite(ShuffleDataFlushEvent event) {
+ try {
+ Storage storage = selectStorage(event);
+ // if storage is null, appId may not be registered
+ if (storage == null || !storage.canWrite()) {
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.warn("Exception happened when select storage", e);
+ return false;
+ }
+ }
+
public StorageWriteMetrics createStorageWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
long length = 0;
long blockNum = 0;
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 0960ca37..a46bea29 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -47,6 +47,8 @@ public interface StorageManager {
void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo);
Checker getStorageChecker();
+
+ boolean canWrite(ShuffleDataFlushEvent event);
// todo: add an interface that check storage isHealthy
}
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index e62fb18c..1c6075aa 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -54,4 +55,28 @@ public class MultiStorageManagerTest {
1, appId, 1, 1, 1, 1000000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
}
+
+ @Test
+ public void selectStorageManagerIfCanNotWriteTest() {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+ conf.setString(ShuffleServerConf.MULTISTORAGE_FALLBACK_STRATEGY_CLASS,
+ RotateStorageManagerFallbackStrategy.class.getCanonicalName());
+ MultiStorageManager manager = new MultiStorageManager(conf);
+ String remoteStorage = "test";
+ String appId = "selectStorageManagerIfCanNotWriteTest_appId";
+ manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 1000, blocks, null, null);
+ Storage storage = manager.selectStorage(event);
+ assertTrue((storage instanceof LocalStorage));
+ ((LocalStorage)storage).markCorrupted();
+ event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 1000, blocks, null, null);
+ assertTrue((manager.selectStorage(event) instanceof HdfsStorage));
+ }
}
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java b/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java
new file mode 100644
index 00000000..c6e37a26
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/storage/StorageManagerFallbackStrategyTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StorageManagerFallbackStrategyTest {
+ private ShuffleServerConf conf;
+
+ @BeforeEach
+ public void prepare() {
+ conf = new ShuffleServerConf();
+ conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
+ }
+
+ @Test
+ public void testDefaultFallbackStrategy() {
+ RotateStorageManagerFallbackStrategy fallbackStrategy = new RotateStorageManagerFallbackStrategy(conf);
+ LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+ HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+ StorageManager current = warmStorageManager;
+ String remoteStorage = "test";
+ String appId = "testDefaultFallbackStrategy_appId";
+ coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 1000, blocks, null, null);
+ event.increaseRetryTimes();
+ StorageManager storageManager = fallbackStrategy.tryFallback(
+ current, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == coldStorageManager);
+
+ conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 3);
+ fallbackStrategy = new RotateStorageManagerFallbackStrategy(conf);
+ storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == warmStorageManager);
+ for (int i = 0; i < 2; i++) {
+ event.increaseRetryTimes();
+ }
+ storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == coldStorageManager);
+ event.increaseRetryTimes();
+ storageManager = fallbackStrategy.tryFallback(current, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == warmStorageManager);
+ for (int i = 0; i < 2; i++) {
+ event.increaseRetryTimes();
+ }
+ storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == warmStorageManager);
+ }
+
+ @Test
+ public void testHdfsFallbackStrategy() {
+ HdfsStorageManagerFallbackStrategy fallbackStrategy = new HdfsStorageManagerFallbackStrategy(conf);
+ LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+ HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+ String remoteStorage = "test";
+ String appId = "testHdfsFallbackStrategy_appId";
+ coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 1000, blocks, null, null);
+ event.increaseRetryTimes();
+ StorageManager storageManager = fallbackStrategy.tryFallback(
+ warmStorageManager, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == warmStorageManager);
+
+ storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == warmStorageManager);
+ }
+
+
+ @Test
+ public void testLocalFallbackStrategy() {
+ LocalStorageManagerFallbackStrategy fallbackStrategy = new LocalStorageManagerFallbackStrategy(conf);
+ LocalStorageManager warmStorageManager = new LocalStorageManager(conf);
+ HdfsStorageManager coldStorageManager = new HdfsStorageManager(conf);
+ String remoteStorage = "test";
+ String appId = "testLocalFallbackStrategy_appId";
+ coldStorageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ List<ShufflePartitionedBlock> blocks = Lists.newArrayList(
+ new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, null));
+ ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(
+ 1, appId, 1, 1, 1, 1000, blocks, null, null);
+ event.increaseRetryTimes();
+ StorageManager storageManager = fallbackStrategy.tryFallback(
+ warmStorageManager, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == coldStorageManager);
+
+ storageManager = fallbackStrategy.tryFallback(coldStorageManager, event, warmStorageManager, coldStorageManager);
+ assertTrue(storageManager == coldStorageManager);
+ }
+}