You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/07 14:19:27 UTC
[incubator-uniffle] branch master updated: [ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 78a03711 [ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)
78a03711 is described below
commit 78a037114b5ec02fb27c0e3728798b716da6a2c0
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Wed Sep 7 22:19:22 2022 +0800
[ISSUE-186][Feature] Use I/O cost time to select storage paths (#192)
### What changes were proposed in this pull request?
Solve #186
### Why are the changes needed?
A better strategy to select remote path.
### Does this PR introduce _any_ user-facing change?
Three config added.
1. `rss.coordinator.remote.storage.select.strategy` support `APP_BALANCE` and `IO_SAMPLE`, `APP_BALANCE` selection strategy based on the number of apps, `IO_SAMPLE` selection strategy based on time consumption of reading and writing files.
2. `rss.coordinator.remote.storage.io.sample.schedule.time` , if user choose `IO_SAMPLE`, file will be read and written at regular intervals.
3. `rss.coordinator.remote.storage.io.sample.file.size` , the size of each read / write HDFS file.
4. `rss.coordinator.remote.storage.io.sample.access.times`, number of times to read and write HDFS files.
### How was this patch tested?
Added ut.
---
coordinator/pom.xml | 4 +
.../AppBalanceSelectStorageStrategy.java | 144 +++++++++++
.../uniffle/coordinator/ApplicationManager.java | 97 +++-----
.../uniffle/coordinator/CoordinatorConf.java | 22 +-
.../LowestIOSampleCostSelectStorageStrategy.java | 266 +++++++++++++++++++++
.../uniffle/coordinator/SelectStorageStrategy.java | 40 ++++
.../AppBalanceSelectStorageStrategyTest.java | 156 ++++++++++++
.../coordinator/ApplicationManagerTest.java | 105 +-------
.../uniffle/coordinator/ClientConfManagerTest.java | 102 +++++++-
...owestIOSampleCostSelectStorageStrategyTest.java | 201 ++++++++++++++++
docs/coordinator_guide.md | 4 +
.../apache/uniffle/test/FetchClientConfTest.java | 57 ++++-
12 files changed, 1028 insertions(+), 170 deletions(-)
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index 686534a6..368742d3 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -73,6 +73,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
new file mode 100644
index 00000000..c42da449
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
+
+/**
+ * AppBalanceSelectStorageStrategy will consider the number of apps allocated on each remote path is balanced.
+ */
+public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
+ /**
+ * store appId -> remote path to make sure all shuffle data of the same application
+ * will be written to the same remote storage
+ */
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+ /**
+ * store remote path -> application count for assignment strategy
+ */
+ private final Map<String, RankValue> remoteStoragePathCounter;
+ private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+
+ public AppBalanceSelectStorageStrategy() {
+ this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+ this.remoteStoragePathCounter = Maps.newConcurrentMap();
+ this.availableRemoteStorageInfo = Maps.newHashMap();
+ }
+
+ /**
+ * the strategy of pick remote storage is according to assignment count
+ */
+ @Override
+ public RemoteStorageInfo pickRemoteStorage(String appId) {
+ if (appIdToRemoteStorageInfo.containsKey(appId)) {
+ return appIdToRemoteStorageInfo.get(appId);
+ }
+
+ // create list for sort
+ List<Map.Entry<String, RankValue>> sizeList =
+ Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
+ .sorted(Comparator.comparingInt(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+
+ for (Map.Entry<String, RankValue> entry : sizeList) {
+ String storagePath = entry.getKey();
+ if (availableRemoteStorageInfo.containsKey(storagePath)) {
+ appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+ incRemoteStorageCounter(storagePath);
+ break;
+ }
+ }
+ return appIdToRemoteStorageInfo.get(appId);
+ }
+
+ @Override
+ @VisibleForTesting
+ public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+ RankValue counter = remoteStoragePathCounter.get(remoteStoragePath);
+ if (counter != null) {
+ counter.getAppNum().incrementAndGet();
+ } else {
+ // it may be happened when assignment remote storage
+ // and refresh remote storage at the same time
+ LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1",
+ remoteStoragePath);
+ remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1));
+ }
+ }
+
+ @Override
+ @VisibleForTesting
+ public synchronized void decRemoteStorageCounter(String storagePath) {
+ if (!StringUtils.isEmpty(storagePath)) {
+ RankValue atomic = remoteStoragePathCounter.get(storagePath);
+ if (atomic != null) {
+ double count = atomic.getAppNum().decrementAndGet();
+ if (count < 0) {
+ LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+ storagePath, count);
+ atomic.getAppNum().set(0);
+ }
+ } else {
+ LOG.warn("Can't find counter for remote storage: {}", storagePath);
+ remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0));
+ }
+ if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0
+ && !availableRemoteStorageInfo.containsKey(storagePath)) {
+ remoteStoragePathCounter.remove(storagePath);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void removePathFromCounter(String storagePath) {
+ RankValue atomic = remoteStoragePathCounter.get(storagePath);
+ if (atomic != null && atomic.getAppNum().get() == 0) {
+ remoteStoragePathCounter.remove(storagePath);
+ }
+ }
+
+ @Override
+ public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+ return appIdToRemoteStorageInfo;
+ }
+
+ @Override
+ public Map<String, RankValue> getRemoteStoragePathRankValue() {
+ return remoteStoragePathCounter;
+ }
+
+ @Override
+ public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+ return availableRemoteStorageInfo;
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 40a1e1ee..6fcd6b56 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -19,16 +19,12 @@ package org.apache.uniffle.coordinator;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -41,24 +37,38 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
public class ApplicationManager {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationManager.class);
private long expired;
+ private StrategyName storageStrategy;
private Map<String, Long> appIds = Maps.newConcurrentMap();
+ private SelectStorageStrategy selectStorageStrategy;
// store appId -> remote path to make sure all shuffle data of the same application
// will be written to the same remote storage
- private Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+ private Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
// store remote path -> application count for assignment strategy
- private Map<String, AtomicInteger> remoteStoragePathCounter = Maps.newConcurrentMap();
+ private Map<String, RankValue> remoteStoragePathRankValue;
private Map<String, String> remoteStorageToHost = Maps.newConcurrentMap();
- private Map<String, RemoteStorageInfo> availableRemoteStorageInfo = Maps.newHashMap();
+ private Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
private ScheduledExecutorService scheduledExecutorService;
// it's only for test case to check if status check has problem
private boolean hasErrorInStatusCheck = false;
public ApplicationManager(CoordinatorConf conf) {
+ storageStrategy = conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY);
+ if (StrategyName.IO_SAMPLE == storageStrategy) {
+ selectStorageStrategy = new LowestIOSampleCostSelectStorageStrategy(conf);
+ } else if (StrategyName.APP_BALANCE == storageStrategy) {
+ selectStorageStrategy = new AppBalanceSelectStorageStrategy();
+ } else {
+ throw new UnsupportedOperationException("Unsupported selected storage strategy.");
+ }
+ appIdToRemoteStorageInfo = selectStorageStrategy.getAppIdToRemoteStorageInfo();
+ remoteStoragePathRankValue = selectStorageStrategy.getRemoteStoragePathRankValue();
+ availableRemoteStorageInfo = selectStorageStrategy.getAvailableRemoteStorageInfo();
expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
// the thread for checking application status
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
@@ -83,7 +93,7 @@ public class ApplicationManager {
// add remote path if not exist
for (String path : paths) {
if (!availableRemoteStorageInfo.containsKey(path)) {
- remoteStoragePathCounter.putIfAbsent(path, new AtomicInteger(0));
+ remoteStoragePathRankValue.putIfAbsent(path, new RankValue(0));
// refreshRemoteStorage is designed without multiple thread problem
// metrics shouldn't be added duplicated
addRemoteStorageMetrics(path);
@@ -117,67 +127,17 @@ public class ApplicationManager {
// the strategy of pick remote storage is according to assignment count
// todo: better strategy with workload balance
public RemoteStorageInfo pickRemoteStorage(String appId) {
- if (appIdToRemoteStorageInfo.containsKey(appId)) {
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- // create list for sort
- List<Map.Entry<String, AtomicInteger>> sizeList =
- Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
- .sorted(Comparator.comparingInt(entry -> entry.getValue().get())).collect(Collectors.toList());
-
- for (Map.Entry<String, AtomicInteger> entry : sizeList) {
- String storagePath = entry.getKey();
- if (availableRemoteStorageInfo.containsKey(storagePath)) {
- appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
- incRemoteStorageCounter(storagePath);
- break;
- }
- }
+ selectStorageStrategy.pickRemoteStorage(appId);
return appIdToRemoteStorageInfo.get(appId);
}
- @VisibleForTesting
- protected synchronized void incRemoteStorageCounter(String remoteStoragePath) {
- AtomicInteger counter = remoteStoragePathCounter.get(remoteStoragePath);
- if (counter != null) {
- counter.incrementAndGet();
- } else {
- // it may be happened when assignment remote storage
- // and refresh remote storage at the same time
- LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1",
- remoteStoragePath);
- remoteStoragePathCounter.put(remoteStoragePath, new AtomicInteger(1));
- }
- }
-
@VisibleForTesting
protected synchronized void decRemoteStorageCounter(String storagePath) {
- if (!StringUtils.isEmpty(storagePath)) {
- AtomicInteger atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null) {
- int count = atomic.decrementAndGet();
- if (count < 0) {
- LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
- storagePath, count);
- atomic.set(0);
- }
- } else {
- LOG.warn("Can't find counter for remote storage: {}", storagePath);
- remoteStoragePathCounter.putIfAbsent(storagePath, new AtomicInteger(0));
- }
- if (remoteStoragePathCounter.get(storagePath).get() == 0
- && !availableRemoteStorageInfo.containsKey(storagePath)) {
- remoteStoragePathCounter.remove(storagePath);
- }
- }
+ selectStorageStrategy.decRemoteStorageCounter(storagePath);
}
private synchronized void removePathFromCounter(String storagePath) {
- AtomicInteger atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null && atomic.get() == 0) {
- remoteStoragePathCounter.remove(storagePath);
- }
+ selectStorageStrategy.removePathFromCounter(storagePath);
}
public Set<String> getAppIds() {
@@ -185,13 +145,13 @@ public class ApplicationManager {
}
@VisibleForTesting
- protected Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
- return appIdToRemoteStorageInfo;
+ protected Map<String, RankValue> getRemoteStoragePathRankValue() {
+ return remoteStoragePathRankValue;
}
@VisibleForTesting
- protected Map<String, AtomicInteger> getRemoteStoragePathCounter() {
- return remoteStoragePathCounter;
+ public SelectStorageStrategy getSelectStorageStrategy() {
+ return selectStorageStrategy;
}
@VisibleForTesting
@@ -237,7 +197,7 @@ public class ApplicationManager {
try {
String storageHost = getStorageHost(remoteStoragePath);
CoordinatorMetrics.updateDynamicGaugeForRemoteStorage(storageHost,
- remoteStoragePathCounter.get(remoteStoragePath).get());
+ remoteStoragePathRankValue.get(remoteStoragePath).getAppNum().get());
} catch (Exception e) {
LOG.warn("Update remote storage metrics for {} failed ", remoteStoragePath);
}
@@ -266,4 +226,9 @@ public class ApplicationManager {
}
return storageHost;
}
+
+ public enum StrategyName {
+ APP_BALANCE,
+ IO_SAMPLE
+ }
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 34c65b98..28931a42 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -26,6 +26,7 @@ import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
import static org.apache.uniffle.coordinator.AssignmentStrategyFactory.StrategyName.PARTITION_BALANCE;
/**
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
.stringType()
.noDefaultValue()
.withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+ public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+ ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+ .enumType(ApplicationManager.StrategyName.class)
+ .defaultValue(APP_BALANCE)
+ .withDescription("Strategy for selecting the remote path");
+ public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME = ConfigOptions
+ .key("rss.coordinator.remote.storage.io.sample.schedule.time")
+ .longType()
+ .defaultValue(60 * 1000L)
+ .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+ public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE = ConfigOptions
+ .key("rss.coordinator.remote.storage.io.sample.file.size")
+ .intType()
+ .defaultValue(204800 * 1000)
+ .withDescription("The size of the file that the scheduled thread reads and writes");
+ public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES = ConfigOptions
+ .key("rss.coordinator.remote.storage.io.sample.access.times")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The number of times to read and write HDFS files");
public CoordinatorConf() {
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
new file mode 100644
index 00000000..96147b8e
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * LowestIOSampleCostSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class LowestIOSampleCostSelectStorageStrategy implements SelectStorageStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LowestIOSampleCostSelectStorageStrategy.class);
+ /**
+ * store appId -> remote path to make sure all shuffle data of the same application
+ * will be written to the same remote storage
+ */
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+ /**
+ * store remote path -> application count for assignment strategy
+ */
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+ private List<Map.Entry<String, RankValue>> sizeList;
+ private FileSystem fs;
+ private Configuration conf;
+ private final int fileSize;
+ private final int readAndWriteTimes;
+
+ public LowestIOSampleCostSelectStorageStrategy(CoordinatorConf cf) {
+ conf = new Configuration();
+ fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE);
+ readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES);
+ this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+ this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+ this.availableRemoteStorageInfo = Maps.newHashMap();
+ this.sizeList = Lists.newCopyOnWriteArrayList();
+ ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+ // should init later than the refreshRemoteStorage init
+ readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+ cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+ }
+
+ public void checkReadAndWrite() {
+ if (remoteStoragePathRankValue.size() > 1) {
+ for (String path : remoteStoragePathRankValue.keySet()) {
+ Path remotePath = new Path(path);
+ Path testPath = new Path(path + "/rssTest");
+ long startWriteTime = System.currentTimeMillis();
+ try {
+ fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+ for (int j = 0; j < readAndWriteTimes; j++) {
+ byte[] data = RandomUtils.nextBytes(fileSize);
+ try (FSDataOutputStream fos = fs.create(testPath)) {
+ fos.write(data);
+ fos.flush();
+ }
+ byte[] readData = new byte[fileSize];
+ int readBytes;
+ try (FSDataInputStream fis = fs.open(testPath)) {
+ int hasReadBytes = 0;
+ do {
+ readBytes = fis.read(readData);
+ if (hasReadBytes < fileSize) {
+ for (int i = 0; i < readBytes; i++) {
+ if (data[hasReadBytes + i] != readData[i]) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+ }
+ }
+ }
+ hasReadBytes += readBytes;
+ } while (readBytes != -1);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Storage read and write error, we will not use this remote path {}.", path, e);
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+ } finally {
+ sortPathByRankValue(path, testPath, startWriteTime);
+ }
+ }
+ } else {
+ sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+ }
+ }
+
+ @VisibleForTesting
+ public void sortPathByRankValue(String path, Path testPath, long startWrite) {
+ try {
+ fs.delete(testPath, true);
+ long totalTime = System.currentTimeMillis() - startWrite;
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+ } catch (Exception e) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+ LOG.error("Failed to sort, we will not use this remote path {}.", path, e);
+ } finally {
+ sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+ .sorted(Comparator.comparingDouble(
+ entry -> entry.getValue().getReadAndWriteTime().get())).collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * the strategy of pick remote storage is based on whether the remote path can be read or written
+ */
+ @Override
+ public RemoteStorageInfo pickRemoteStorage(String appId) {
+ if (appIdToRemoteStorageInfo.containsKey(appId)) {
+ return appIdToRemoteStorageInfo.get(appId);
+ }
+
+ for (Map.Entry<String, RankValue> entry : sizeList) {
+ String storagePath = entry.getKey();
+ if (availableRemoteStorageInfo.containsKey(storagePath)) {
+ appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+ incRemoteStorageCounter(storagePath);
+ break;
+ }
+ }
+ return appIdToRemoteStorageInfo.get(appId);
+ }
+
+ @Override
+ @VisibleForTesting
+ public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+ RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath);
+ if (counter != null) {
+ counter.getAppNum().incrementAndGet();
+ } else {
+ remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1));
+ // it may be happened when assignment remote storage
+ // and refresh remote storage at the same time
+ LOG.warn("Remote storage path lost during assignment: %s doesn't exist, "
+ + "reset the rank value to 0 and app size to 1.", remoteStoragePath);
+ }
+ }
+
+ @Override
+ @VisibleForTesting
+ public synchronized void decRemoteStorageCounter(String storagePath) {
+ if (!StringUtils.isEmpty(storagePath)) {
+ RankValue atomic = remoteStoragePathRankValue.get(storagePath);
+ if (atomic != null) {
+ double count = atomic.getAppNum().decrementAndGet();
+ if (count < 0) {
+ LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+ storagePath, count);
+ atomic.getAppNum().set(0);
+ }
+ } else {
+ remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1));
+ LOG.warn("Can't find counter for remote storage: {}", storagePath);
+ }
+
+ if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
+ && !availableRemoteStorageInfo.containsKey(storagePath)) {
+ remoteStoragePathRankValue.remove(storagePath);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void removePathFromCounter(String storagePath) {
+ RankValue rankValue = remoteStoragePathRankValue.get(storagePath);
+ // The time spent reading and writing cannot be used to determine whether the current path is still used by apps.
+ // Therefore, determine whether the HDFS path is still used by the number of apps
+ if (rankValue != null && rankValue.getAppNum().get() == 0) {
+ remoteStoragePathRankValue.remove(storagePath);
+ }
+ }
+
+ @Override
+ public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+ return appIdToRemoteStorageInfo;
+ }
+
+ @Override
+ public Map<String, RankValue> getRemoteStoragePathRankValue() {
+ return remoteStoragePathRankValue;
+ }
+
+ @Override
+ public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+ return availableRemoteStorageInfo;
+ }
+
+ @VisibleForTesting
+ public void setFs(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ @VisibleForTesting
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ static class RankValue {
+ AtomicLong readAndWriteTime;
+ AtomicInteger appNum;
+
+ RankValue(int appNum) {
+ this.readAndWriteTime = new AtomicLong(0);
+ this.appNum = new AtomicInteger(appNum);
+ }
+
+ RankValue(long ratioValue, int appNum) {
+ this.readAndWriteTime = new AtomicLong(ratioValue);
+ this.appNum = new AtomicInteger(appNum);
+ }
+
+ public AtomicLong getReadAndWriteTime() {
+ return readAndWriteTime;
+ }
+
+ public AtomicInteger getAppNum() {
+ return appNum;
+ }
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
new file mode 100644
index 00000000..654ec8a0
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Map;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue;
+
+public interface SelectStorageStrategy {
+
+ RemoteStorageInfo pickRemoteStorage(String appId);
+
+ void incRemoteStorageCounter(String remoteStoragePath);
+
+ void decRemoteStorageCounter(String storagePath);
+
+ void removePathFromCounter(String storagePath);
+
+ Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo();
+
+ Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo();
+
+ Map<String, RankValue> getRemoteStoragePathRankValue();
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
new file mode 100644
index 00000000..d52d65aa
--- /dev/null
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.util.Constants;
+
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AppBalanceSelectStorageStrategyTest {
+
+ private AppBalanceSelectStorageStrategy appBalanceSelectStorageStrategy;
+ private ApplicationManager applicationManager;
+ private long appExpiredTime = 2000L;
+ private String remotePath1 = "hdfs://path1";
+ private String remotePath2 = "hdfs://path2";
+ private String remotePath3 = "hdfs://path3";
+
+ @BeforeAll
+ public static void setup() {
+ CoordinatorMetrics.register();
+ }
+
+ @AfterAll
+ public static void clear() {
+ CoordinatorMetrics.clear();
+ }
+
+ @BeforeEach
+ public void setUp() {
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
+ conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE);
+ applicationManager = new ApplicationManager(conf);
+ appBalanceSelectStorageStrategy = (AppBalanceSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+ }
+
+ @Test
+ public void selectStorageTest() throws Exception {
+ String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get());
+ assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+ String storageHost1 = "path1";
+ assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+ String storageHost2 = "path2";
+ assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+ // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call
+ appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1);
+ appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1);
+ String testApp1 = "testApp1";
+ applicationManager.refreshAppId(testApp1);
+ assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ assertEquals(remotePath2, appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
+ assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+ // return the same value if did the assignment already
+ assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+
+ Thread.sleep(appExpiredTime + 2000);
+ assertNull(appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1));
+ assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+ assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+ // refresh app1, got remotePath2, then remove remotePath2,
+ // it should be existed in counter until it expired
+ applicationManager.refreshAppId(testApp1);
+ assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ remoteStoragePath = remotePath1;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)),
+ appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+ assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
+ // app1 is expired, remotePath2 is removed because of counter = 0
+ Thread.sleep(appExpiredTime + 2000);
+ assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)),
+ appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+
+ // restore previous manually inc for next test case
+ appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1);
+ appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1);
+ // remove all remote storage
+ applicationManager.refreshRemoteStorage("", "");
+ assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size());
+ assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size());
+ assertFalse(applicationManager.hasErrorInStatusCheck());
+ }
+
+ @Test
+ public void storageCounterMulThreadTest() throws Exception {
+ String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2
+ + Constants.COMMA_SPLIT_CHAR + remotePath3;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ String appPrefix = "testAppId";
+
+ Thread pickThread1 = new Thread(() -> {
+ for (int i = 0; i < 1000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+
+ Thread pickThread2 = new Thread(() -> {
+ for (int i = 1000; i < 2000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+
+ Thread pickThread3 = new Thread(() -> {
+ for (int i = 2000; i < 3000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ appBalanceSelectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+ pickThread1.start();
+ pickThread2.start();
+ pickThread3.start();
+ pickThread1.join();
+ pickThread2.join();
+ pickThread3.join();
+ Thread.sleep(appExpiredTime + 2000);
+
+ applicationManager.refreshRemoteStorage("", "");
+ assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size());
+ assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size());
+ assertFalse(applicationManager.hasErrorInStatusCheck());
+ }
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 8cbccbc2..791ec1dc 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -31,7 +31,6 @@ import org.apache.uniffle.common.util.Constants;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ApplicationManagerTest {
@@ -66,19 +65,19 @@ public class ApplicationManagerTest {
Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
- assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+ assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
remoteStoragePath = remotePath3;
expectedAvailablePath = Sets.newHashSet(remotePath3);
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
- assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+ assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath3;
expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath3);
applicationManager.refreshRemoteStorage(remoteStoragePath, remoteStorageConf);
assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
- assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
+ assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
Map<String, RemoteStorageInfo> storages = applicationManager.getAvailableRemoteStorageInfo();
RemoteStorageInfo remoteStorageInfo = storages.get(remotePath1);
assertEquals(2, remoteStorageInfo.getConfItems().size());
@@ -94,59 +93,7 @@ public class ApplicationManagerTest {
assertEquals(1, remoteStorageInfo.getConfItems().size());
assertEquals("v3", remoteStorageInfo.getConfItems().get("k3"));
assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
- assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet());
- assertFalse(applicationManager.hasErrorInStatusCheck());
- }
-
- @Test
- public void storageCounterTest() throws Exception {
- String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2;
- applicationManager.refreshRemoteStorage(remoteStoragePath, "");
- assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath1).get());
- assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
- String storageHost1 = "path1";
- assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
- String storageHost2 = "path2";
- assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
-
- // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call
- applicationManager.incRemoteStorageCounter(remotePath1);
- applicationManager.incRemoteStorageCounter(remotePath1);
- String testApp1 = "testApp1";
- applicationManager.refreshAppId(testApp1);
- assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
- assertEquals(remotePath2, applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
- assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
- // return the same value if did the assignment already
- assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
- assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
-
- Thread.sleep(appExpiredTime + 2000);
- assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1));
- assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
- assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
-
- // refresh app1, got remotePath2, then remove remotePath2,
- // it should be existed in counter until it expired
- applicationManager.refreshAppId(testApp1);
- assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
- remoteStoragePath = remotePath1;
- applicationManager.refreshRemoteStorage(remoteStoragePath, "");
- assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)),
- applicationManager.getRemoteStoragePathCounter().keySet());
- assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get());
- // app1 is expired, remotePath2 is removed because of counter = 0
- Thread.sleep(appExpiredTime + 2000);
- assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)),
- applicationManager.getRemoteStoragePathCounter().keySet());
-
- // restore previous manually inc for next test case
- applicationManager.decRemoteStorageCounter(remotePath1);
- applicationManager.decRemoteStorageCounter(remotePath1);
- // remove all remote storage
- applicationManager.refreshRemoteStorage("", "");
- assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
- assertEquals(0, applicationManager.getRemoteStoragePathCounter().size());
+ assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet());
assertFalse(applicationManager.hasErrorInStatusCheck());
}
@@ -163,48 +110,4 @@ public class ApplicationManagerTest {
assertEquals(0, applicationManager.getAppIds().size());
assertFalse(applicationManager.hasErrorInStatusCheck());
}
-
- @Test
- public void storageCounterMulThreadTest() throws Exception {
- String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2
- + Constants.COMMA_SPLIT_CHAR + remotePath3;
- applicationManager.refreshRemoteStorage(remoteStoragePath, "");
- String appPrefix = "testAppId";
-
- Thread pickThread1 = new Thread(() -> {
- for (int i = 0; i < 1000; i++) {
- String appId = appPrefix + i;
- applicationManager.refreshAppId(appId);
- applicationManager.pickRemoteStorage(appId);
- }
- });
-
- Thread pickThread2 = new Thread(() -> {
- for (int i = 1000; i < 2000; i++) {
- String appId = appPrefix + i;
- applicationManager.refreshAppId(appId);
- applicationManager.pickRemoteStorage(appId);
- }
- });
-
- Thread pickThread3 = new Thread(() -> {
- for (int i = 2000; i < 3000; i++) {
- String appId = appPrefix + i;
- applicationManager.refreshAppId(appId);
- applicationManager.pickRemoteStorage(appId);
- }
- });
- pickThread1.start();
- pickThread2.start();
- pickThread3.start();
- pickThread1.join();
- pickThread2.join();
- pickThread3.join();
- Thread.sleep(appExpiredTime + 2000);
-
- applicationManager.refreshRemoteStorage("", "");
- assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
- assertEquals(0, applicationManager.getRemoteStoragePathCounter().size());
- assertFalse(applicationManager.hasErrorInStatusCheck());
- }
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
index c5bfc0f2..7a618174 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator;
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.util.Map;
@@ -28,6 +29,10 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -36,6 +41,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -43,6 +49,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientConfManagerTest {
+ @TempDir
+ private final File remotePath = new File("hdfs://rss");
+ private static MiniDFSCluster cluster;
+ private Configuration hdfsConf = new Configuration();
+
@BeforeEach
public void setUp() {
CoordinatorMetrics.register();
@@ -53,6 +64,18 @@ public class ClientConfManagerTest {
CoordinatorMetrics.clear();
}
+ @AfterAll
+ public static void close() {
+ cluster.close();
+ }
+
+ public void createMiniHdfs(String hdfsPath) throws IOException {
+ hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+ hdfsConf.set("dfs.nameservices", "rss");
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath);
+ cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+ }
+
@Test
public void test(@TempDir File tempDir) throws Exception {
File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
@@ -137,7 +160,7 @@ public class ClientConfManagerTest {
}
@Test
- public void dynamicRemoteStorageTest() throws Exception {
+ public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception {
int updateIntervalSec = 2;
final String remotePath1 = "hdfs://host1/path1";
final String remotePath2 = "hdfs://host2/path2";
@@ -194,6 +217,83 @@ public class ClientConfManagerTest {
clientConfManager.close();
}
+ @Test
+ public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
+ final int updateIntervalSec = 2;
+ final String remotePath1 = "hdfs://host1/path1";
+ final String remotePath2 = "hdfs://host2/path2";
+ final String remotePath3 = "hdfs://host3/path3";
+ File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile();
+ cfgFile.deleteOnExit();
+ writeRemoteStorageConf(cfgFile, remotePath1);
+ createMiniHdfs(remotePath.getAbsolutePath());
+
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, updateIntervalSec);
+ conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString());
+ conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
+ conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ // init IORankScheduler
+ Thread.sleep(2000);
+ LowestIOSampleCostSelectStorageStrategy selectStorageStrategy =
+ (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+ Path testPath = new Path("/test");
+ FileSystem fs = testPath.getFileSystem(hdfsConf);
+ selectStorageStrategy.setFs(fs);
+
+ final ClientConfManager clientConfManager = new ClientConfManager(conf, new Configuration(), applicationManager);
+ Thread.sleep(500);
+ Set<String> expectedAvailablePath = Sets.newHashSet(remotePath1);
+ assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet());
+ selectStorageStrategy.sortPathByRankValue(remotePath1, testPath, System.currentTimeMillis());
+ RemoteStorageInfo remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId1");
+ assertEquals(remotePath1, remoteStorageInfo.getPath());
+ assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+ writeRemoteStorageConf(cfgFile, remotePath3);
+ expectedAvailablePath = Sets.newHashSet(remotePath3);
+ waitForUpdate(expectedAvailablePath, applicationManager);
+ // The reason for setting the filesystem here is to trigger the execution of sortPathByRankValue
+ selectStorageStrategy.setFs(fs);
+ selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, System.currentTimeMillis());
+ remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2");
+ assertEquals(remotePath3, remoteStorageInfo.getPath());
+
+ String confItems = "host2,k1=v1,k2=v2;host3,k3=v3";
+ final long current = System.currentTimeMillis();
+ writeRemoteStorageConf(cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, confItems);
+ expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3);
+ waitForUpdate(expectedAvailablePath, applicationManager);
+ selectStorageStrategy.setFs(fs);
+ selectStorageStrategy.sortPathByRankValue(remotePath2, testPath, current);
+ selectStorageStrategy.setFs(fs);
+ selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, current);
+ remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3");
+ assertEquals(remotePath2, remoteStorageInfo.getPath());
+ assertEquals(2, remoteStorageInfo.getConfItems().size());
+ assertEquals("v1", remoteStorageInfo.getConfItems().get("k1"));
+ assertEquals("v2", remoteStorageInfo.getConfItems().get("k2"));
+
+ confItems = "host1,keyTest1=test1,keyTest2=test2;host2,k1=deadbeaf";
+ writeRemoteStorageConf(cfgFile, remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2, confItems);
+ expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2);
+ waitForUpdate(expectedAvailablePath, applicationManager);
+ remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId4");
+ // one of remote storage will be chosen
+ assertTrue(
+ (remotePath1.equals(remoteStorageInfo.getPath())
+ && (remoteStorageInfo.getConfItems().size() == 2)
+ && (remoteStorageInfo.getConfItems().get("keyTest1").equals("test1")))
+ && (remoteStorageInfo.getConfItems().get("keyTest2").equals("test2"))
+ || (remotePath2.equals(remoteStorageInfo.getPath())
+ && remoteStorageInfo.getConfItems().size() == 1)
+ && remoteStorageInfo.getConfItems().get("k1").equals("deadbeaf"));
+
+ clientConfManager.close();
+ }
+
private void writeRemoteStorageConf(File cfgFile, String value) throws Exception {
writeRemoteStorageConf(cfgFile, value, null);
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
new file mode 100644
index 00000000..41eec3e7
--- /dev/null
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.File;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.common.util.Constants;
+
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LowestIOSampleCostSelectStorageStrategyTest {
+
+ private LowestIOSampleCostSelectStorageStrategy selectStorageStrategy;
+ private ApplicationManager applicationManager;
+ private static final Configuration hdfsConf = new Configuration();
+ private static MiniDFSCluster cluster;
+ private final long appExpiredTime = 2000L;
+ private final String remoteStorage1 = "hdfs://p1";
+ private final String remoteStorage2 = "hdfs://p2";
+ private final String remoteStorage3 = "hdfs://p3";
+ private final Path testFile = new Path("test");
+
+ @TempDir
+ private static File remotePath = new File("hdfs://rss");
+
+ @BeforeAll
+ public static void setup() {
+ CoordinatorMetrics.register();
+ }
+
+ @AfterAll
+ public static void clear() {
+ CoordinatorMetrics.clear();
+ cluster.close();
+ }
+
+ @BeforeEach
+ public void init() throws Exception {
+ setUpHdfs(remotePath.getAbsolutePath());
+ }
+
+ public void setUpHdfs(String hdfsPath) throws Exception {
+ hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath());
+ hdfsConf.set("dfs.nameservices", "rss");
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath);
+ cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
+ Thread.sleep(500L);
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
+ conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 5000);
+ conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+ applicationManager = new ApplicationManager(conf);
+ selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
+ selectStorageStrategy.setConf(hdfsConf);
+ Thread.sleep(1000);
+ }
+
+ @Test
+ public void selectStorageTest() throws Exception {
+ FileSystem fs = testFile.getFileSystem(hdfsConf);
+ selectStorageStrategy.setFs(fs);
+
+ String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ //default value is 0
+ assertEquals(0,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage1).getReadAndWriteTime().get());
+ assertEquals(0,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getReadAndWriteTime().get());
+ String storageHost1 = "p1";
+ assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+ String storageHost2 = "p2";
+ assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+
+ // compare with two remote path
+ selectStorageStrategy.incRemoteStorageCounter(remoteStorage1);
+ selectStorageStrategy.incRemoteStorageCounter(remoteStorage1);
+ String testApp1 = "testApp1";
+ final long current = System.currentTimeMillis();
+ applicationManager.refreshAppId(testApp1);
+ fs.create(testFile);
+ selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, current);
+ fs.create(testFile);
+ selectStorageStrategy.sortPathByRankValue(remoteStorage1, testFile, current);
+ assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ assertEquals(remoteStorage2, selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath());
+ assertEquals(1,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+ // return the same value if did the assignment already
+ assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ assertEquals(1,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+
+ // when the expiration time is reached, the app was removed
+ Thread.sleep(appExpiredTime + 2000);
+ assertNull(selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1));
+ assertEquals(0,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+
+ // refresh app1, got remotePath2, then remove remotePath2,
+ // it should be existed in counter until it expired
+ applicationManager.refreshAppId(testApp1);
+ assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath());
+ remoteStoragePath = remoteStorage1;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1, remoteStorage2)),
+ selectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+ assertTrue(selectStorageStrategy.getRemoteStoragePathRankValue()
+ .get(remoteStorage2).getReadAndWriteTime().get() > 0);
+ assertEquals(1,
+ selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get());
+ // app1 is expired, p2 is removed because of counter = 0
+ Thread.sleep(appExpiredTime + 2000);
+ assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1)),
+ selectStorageStrategy.getRemoteStoragePathRankValue().keySet());
+ // restore previous manually inc for next test case
+ selectStorageStrategy.decRemoteStorageCounter(remoteStorage1);
+ selectStorageStrategy.decRemoteStorageCounter(remoteStorage1);
+ // remove all remote storage
+ applicationManager.refreshRemoteStorage("", "");
+ assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size());
+ assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size());
+ assertFalse(applicationManager.hasErrorInStatusCheck());
+ }
+
+ @Test
+ public void selectStorageMulThreadTest() throws Exception {
+ FileSystem fs = testFile.getFileSystem(hdfsConf);
+ selectStorageStrategy.setFs(fs);
+ String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2
+ + Constants.COMMA_SPLIT_CHAR + remoteStorage3;
+ applicationManager.refreshRemoteStorage(remoteStoragePath, "");
+ String appPrefix = "testAppId";
+
+ Thread pickThread1 = new Thread(() -> {
+ for (int i = 0; i < 1000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ selectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+
+ Thread pickThread2 = new Thread(() -> {
+ for (int i = 1000; i < 2000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ selectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+
+ Thread pickThread3 = new Thread(() -> {
+ for (int i = 2000; i < 3000; i++) {
+ String appId = appPrefix + i;
+ applicationManager.refreshAppId(appId);
+ selectStorageStrategy.pickRemoteStorage(appId);
+ }
+ });
+ pickThread1.start();
+ pickThread2.start();
+ pickThread3.start();
+ pickThread1.join();
+ pickThread2.join();
+ pickThread3.join();
+ Thread.sleep(appExpiredTime + 2000);
+
+ applicationManager.refreshRemoteStorage("", "");
+ assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size());
+ assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size());
+ assertFalse(applicationManager.hasErrorInStatusCheck());
+ }
+}
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index cd71326d..f9c263a2 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -95,6 +95,10 @@ This document will introduce how to deploy Uniffle coordinators.
|rss.coordinator.remote.storage.cluster.conf|-|Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'|
|rss.rpc.server.port|-|RPC port for coordinator|
|rss.jetty.http.port|-|Http port for coordinator|
+|rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for selecting the remote path|
+|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of scheduling the read and write time of the paths to obtain different HDFS|
+|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the file that the scheduled thread reads and writes|
+|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times to read and write HDFS files|
### AccessClusterLoadChecker settings
|Property Name|Default| Description|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
index c93723c6..73450d7d 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java
@@ -39,6 +39,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -92,10 +93,11 @@ public class FetchClientConfTest extends CoordinatorTestBase {
response = coordinatorClient.fetchClientConf(request);
assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode());
assertEquals(0, response.getClientConf().size());
+ shutdownServers();
}
@Test
- public void testFetchRemoteStorage(@TempDir File tempDir) throws Exception {
+ public void testFetchRemoteStorageByApp(@TempDir File tempDir) throws Exception {
String remotePath1 = "hdfs://path1";
File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
String contItem = "path2,key1=test1,key2=test2";
@@ -139,6 +141,59 @@ public class FetchClientConfTest extends CoordinatorTestBase {
assertEquals(2, remoteStorageInfo.getConfItems().size());
assertEquals("test1", remoteStorageInfo.getConfItems().get("key1"));
assertEquals("test2", remoteStorageInfo.getConfItems().get("key2"));
+ shutdownServers();
+ }
+
+ @Test
+ public void testFetchRemoteStorageByIO(@TempDir File tempDir) throws Exception {
+ String remotePath1 = "hdfs://path1";
+ File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
+ String contItem = "path2,key1=test1,key2=test2";
+ Map<String, String> dynamicConf = Maps.newHashMap();
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath1);
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF.key(), contItem);
+ writeRemoteStorageConf(cfgFile, dynamicConf);
+
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
+ coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString());
+ coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 2);
+ coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 500);
+ coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+ createCoordinatorServer(coordinatorConf);
+ startServers();
+
+ waitForUpdate(Sets.newHashSet(remotePath1), coordinators.get(0).getApplicationManager());
+ String appId = "testFetchRemoteStorageApp";
+ RssFetchRemoteStorageRequest request = new RssFetchRemoteStorageRequest(appId);
+ RssFetchRemoteStorageResponse response = coordinatorClient.fetchRemoteStorage(request);
+ RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo();
+ assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+ assertEquals(remotePath1, remoteStorageInfo.getPath());
+
+ // update remote storage info
+ String remotePath2 = "hdfs://path2";
+ dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath2);
+ writeRemoteStorageConf(cfgFile, dynamicConf);
+ waitForUpdate(Sets.newHashSet(remotePath2), coordinators.get(0).getApplicationManager());
+ request = new RssFetchRemoteStorageRequest(appId);
+ response = coordinatorClient.fetchRemoteStorage(request);
+ // remotePath1 will be return because (appId -> remote storage path) is in cache
+ remoteStorageInfo = response.getRemoteStorageInfo();
+ assertEquals(remotePath1, remoteStorageInfo.getPath());
+ assertTrue(remoteStorageInfo.getConfItems().isEmpty());
+
+ // ensure sizeList can be updated
+ Thread.sleep(2000);
+ request = new RssFetchRemoteStorageRequest(appId + "another");
+ response = coordinatorClient.fetchRemoteStorage(request);
+ // got the remotePath2 for new appId
+ remoteStorageInfo = response.getRemoteStorageInfo();
+ assertEquals(remotePath2, remoteStorageInfo.getPath());
+ assertEquals(2, remoteStorageInfo.getConfItems().size());
+ assertEquals("test1", remoteStorageInfo.getConfItems().get("key1"));
+ assertEquals("test2", remoteStorageInfo.getConfItems().get("key2"));
+ shutdownServers();
}
private void waitForUpdate(