You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/08/30 08:49:17 UTC

[GitHub] [incubator-uniffle] smallzhongfeng opened a new pull request, #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

smallzhongfeng opened a new pull request, #192:
URL: https://github.com/apache/incubator-uniffle/pull/192

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/Tencent/Firestorm/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Solve #186 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   A better strategy to select remote path.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Three config added.
   1. `rss.coordinator.remote.storage.select.strategy` support `APP` and `IO`, `APP` selection strategy based on the number of apps, `IO` selection strategy based on time consumption of reading and writing files.
   2. `rss.coordinator.remote.storage.schedule.time` , if user choose `IO`, file will be read and written at regular intervals.
   3. `rss.coordinator.remote.storage.file.size` , the size of each read / write file.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added ut.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963191525


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.

Review Comment:
   > OK, it should be said that the more stable path will be more accurate. right?
   
   We don't think we should use the word `more`. We just prove that the HDFS is writable and readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1238855117

   Actually, we don't need to write multiple times in one sample, you can collect multilple samples to caculate the IO cost time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234168924

   ![image](https://user-images.githubusercontent.com/84573424/187906786-62a22d53-7247-454f-b421-9d687d6388f2.png)
   Sorry, I can't disclose too much, but this is a related log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236121048

   > Because the original strategy of allocating paths based on the number of apps still exists, you only need to change the parameters to meet the original requirements. At the same time, for other users, like us, they just want to provide a means of disaster recovery, so configure multiple path, but if you want to write to a cluster with faster writing speed as much as possible, is the new strategy more suitable?
   
   OK, but we should give the class another better name and add some comments to explain why we need this strategy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] colinmjj commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
colinmjj commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236509050

   @smallzhongfeng For this update, agree @jerqi to add the common interface with the default strategy. I prefer to add common strategy in the project, for specific strategy, it can be implemented by user as a plugin.
   The current strategy to test HDFS's workload depend on the communication between Coordinator and DN, so it can't tell if shuffle server has problem with HDFS. On the other hand, we don't know when the shuffle process happen after registration, maybe 10s, maybe 1 hr, and situation will be different from registration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233983489

   Maybe you can collect the write speed from shuffle servers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235103451

   > If use the hdfs metrics system, which metric do you think is better to measure? The length of `callqueue` or the ratio of the load of dn to the number of dn
   
   Maybe you can ask your company internal HDFS colleagues.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] whbing commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
whbing commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235456858

   > If you just use the single w/r time to judge whether HDFS is usable or not, it's ok. But you use the single w/r time as the metric of system load, it seems wrong. Could we get some metrics of HDFS? It may be more accurate. I don't know whether HDFS provide similar metrics.
   
   Just my opinion, our goal is to choose a faster cluster, and from this point of view it makes the most sense to use w/r speed for evaluation. There are many factors that affect the w/r speed, such as NameNode RPC, DataNode IO, Network, we don't need to care about these. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1239102520

   If we increase the sample times and you insist on the comparision of io sample, could we change the class name from  `HealthSelectStorageStrategy` to `LowestIOSampleSelectStorageStrategy`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237606056

   > Is `IOSampleSelectStorageStrategy` a better name? `HealthSelectStorageStrategy` is a little general.
   
   I still worry about the name ...  `HealthSelectStorageStrategy` and `IOSampleSelectStorageStrategy` aren't good enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963239722


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);
+      long totalTime = System.currentTimeMillis() - startWrite;
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(
+              entry -> entry.getValue().getRatioValue().get())).collect(Collectors.toList());
+    } catch (Exception e) {
+      LOG.error("Failed to delete directory, "
+          + "we should compare the number of apps to select a remote path" + sizeList, e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * the strategy of pick remote storage is according to assignment count

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233799261

   Now the core of this PR is to enable multiple HDFS paths to have certain disaster tolerance capability. IIRC, the metrics of HDFS can only be obtained through the rest API, and it may not be very stable through HTTP. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233990280

   I think there are many metrics that can directly reflect the pressure of the cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963205819


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);
+      long totalTime = System.currentTimeMillis() - startWrite;
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(
+              entry -> entry.getValue().getRatioValue().get())).collect(Collectors.toList());
+    } catch (Exception e) {
+      LOG.error("Failed to delete directory, "
+          + "we should compare the number of apps to select a remote path" + sizeList, e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * the strategy of pick remote storage is according to assignment count

Review Comment:
   Is the comment correct?



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);

Review Comment:
   Why do use `deleteOnExit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236114797

   Because the original strategy of allocating paths based on the number of apps still exists, you only need to change the parameters to meet the original requirements. At the same time, for other users, like us, they just want to provide a means of disaster recovery, so configure multiple path, but if you want to write to a cluster with faster writing speed as much as possible, is the new strategy more suitable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962670064


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -125,7 +125,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = ConfigOptions
+      .key("rss.coordinator.remote.storage.select.strategy")
+      .stringType()
+      .defaultValue("IO")

Review Comment:
   OK. I will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964525041


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1239250206

   I'm sorry for the trouble my insistence has caused you, but this does have certain application scenarios in our company. At present, `LowestIOSampleCostSelectStorageStrategy` may be more suitable. I will change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234185242

   At present, the data we Read-write is 10m, so it is fast in theory, and this has nothing to do with the number of nodes. Reading and writing should depend on the performance of the disk. Right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234168437

   ![image](https://user-images.githubusercontent.com/84573424/187906483-7fb79b7e-07a1-44da-b919-14587b71cc8c.png)
   Sorry, I can't disclose too much, , but this is a related log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962459717


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -125,7 +125,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = ConfigOptions
+      .key("rss.coordinator.remote.storage.select.strategy")
+      .stringType()
+      .defaultValue("IO")

Review Comment:
   Of course, you can let users choose by themselves, but I actually prefer the IO strategy, because if you use IO, you can avoid some risks better than the APP strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962670064


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -125,7 +125,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = ConfigOptions
+      .key("rss.coordinator.remote.storage.select.strategy")
+      .stringType()
+      .defaultValue("IO")

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962883449


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.

Review Comment:
    I don't think the measure of this strategy can tell me which HDFS is faster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963227690


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.schedule.time")

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964337093


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   If we write the same file, the file will be allocated the same DN, our sample  will only test only one DN.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1231500007

   It may not correct enough to sample the I/O cost time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234033509

   It seems difficult to determine a reasonable threshold, so it may be better to directly compare the time consumption from different path. @zuston 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234412756

   How to evaluate the throughout of the HDFS cluster? I use the throughout of the DataNode *  the number of throughout. Don't care the single write and read time too much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236516820

   > > > OK, but we should give the class another better name and add some comments to explain why we need this strategy.
   > > 
   > > 
   > > OK. I'll add some comments later and what do you suggest for the name of this class.
   > 
   > like `AvailabilitySelectStorageStrategy` or `HealthSelectStorageStrategy`, but I don't these names are good enough.
   
   Maybe `HealthSelectStorageStrategy` will better than `AvailabilitySelectStorageStrategy` . Because availability should consider the HDFS service capacity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963207572


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.schedule.time")

Review Comment:
   `rss.coordinator.remote.storage.schedule.time` -> `rss.coordinator.remote.storage.health.schedule.time`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1239321736

   > Please update the description.
   Updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1238296338

   Your suggestion is good for me and I added a parameter `rss.coordinator.remote.storage.access.times` for the number of times to read and write files. The default file size is 200m.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237775007

   The interface is ok for me. But the strategy is hardly acceptable for me. Maybe you can only modify the interface in this pr,  you can only maintain the strategy for your internal implement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234165550

   It doesn't matter. Maybe you're right. After all, it's also a process of discussion. Then, this solution has been deployed in our online cluster for nearly a month.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964760960


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_FILE_SIZE = ConfigOptions
+      .key("rss.coordinator.remote.storage.file.size")
+      .intType()
+      .defaultValue(204800 * 1000)
+      .withDescription("The size of the file that the scheduled thread reads and writes");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES = ConfigOptions
+      .key("rss.coordinator.remote.storage.access.times")
+      .intType()
+      .defaultValue(3)
+      .withDescription("The number of times to read and write HDFS files");

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964772824


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237727891

   Because all shuffle servers access the same hdfs path at the same time, this is actually unnecessary. For the process of reading and writing, you can actually understand that this process is not only about calculating his time, but also whether it can read or write normally. Whether there will be exceptions in the process of reading and writing, this is actually an extra check on whether the cluster is abnormal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237778306

   If you want to use the IO sample, please modify your sample method. One time is not enough, 10M may be two small. The value must be bigger than single buffer flush threshold.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237708842

   In general, I hardly accept that we use sample IO speed to judge the speed HDFS. And it's a little weird that we use this strategy to judge whether HDFS is ok. It should be a common capacity to judge storage's health. Maybe we should use the shuffle server to judge whether HDFS is ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237758874

   Because for us, a large and faster cluster needs to write more data, instead of writing to two clusters evenly, and this only provides a strategy. If we want to modify the original policy, maybe we can add these logic in another PR?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235596044

   > > If you just use the single w/r time to judge whether HDFS is usable or not, it's ok. But you use the single w/r time as the metric of system load, it seems wrong. Could we get some metrics of HDFS? It may be more accurate. I don't know whether HDFS provide similar metrics.
   > 
   > @jerqi Just my opinion, our goal is to choose a faster cluster, and from this point of view it makes the most sense to use w/r speed for evaluation. There are many factors that affect the w/r speed, such as NameNode RPC, DataNode IO, Network, we don't need to care about these.
   
   W/R speed is ok. But I don't think we can meaure the speed by writing 10MB data every some minutes. @smallzhongfeng we also need to avoid too many applications to choose the same HDFS, Because many applications will start at 0 o'clock. They will choose the fatest HDFS at that time if we use your strategy. We ever met similar problems, we allocate too many applications to the shuffle servers. so we raise a pr https://github.com/Tencent/Firestorm/pull/3/files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962453592


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -125,7 +125,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = ConfigOptions
+      .key("rss.coordinator.remote.storage.select.strategy")
+      .stringType()
+      .defaultValue("IO")

Review Comment:
   Could we use "APP" as default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233981808

   > I think what you said is reasonable. I agree to divide into two PRS. However, at present, the load of the cluster can not be perfectly evaluated. However, the read-write time can directly compare the I / O speed of the cluster. Moreover, for HDFS metrics, there seems to be no metric that can directly reflect the pressure of the cluster.
   
   Read-write time can't  reflect the overall situation of the cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234168444

   ![image](https://user-images.githubusercontent.com/84573424/187906483-7fb79b7e-07a1-44da-b919-14587b71cc8c.png)
   Sorry, I can't disclose it, but this is a related log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233640578

   If you just use the single w/r time to judge whether HDFS is usable or not, it's ok. But you use the single w/r time as the metric of system load, it seems wrong. Could we get some metrics of HDFS? It may be more accurate. I don't know whether HDFS provide similar metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234055359

   > I think there are many metrics that can directly reflect the pressure of the cluster.
   
   There are indeed some metrics that can reflect the load of the cluster, but will the Read-write time be more critical for these shuffled temporary files.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963208796


##########
coordinator/src/test/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategyTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.io.File;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.common.util.Constants;
+
+import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.HEALTH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HealthSelectStorageStrategyTest {
+
+  private HealthSelectStorageStrategy healthSelectStorageStrategy;
+  private ApplicationManager applicationManager;
+  private static final Configuration hdfsConf = new Configuration();
+  private static MiniDFSCluster cluster;
+  private final long appExpiredTime = 2000L;
+  private final String remoteStorage1 = "hdfs://p1";
+  private final String remoteStorage2 = "hdfs://p2";
+  private final String remoteStorage3 = "hdfs://p3";
+  private final Path testFile = new Path("test");
+
+  @TempDir
+  private static File remotePath = new File("hdfs://rss");
+
+  @BeforeAll
+  public static void setup() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterAll
+  public static void clear() {
+    CoordinatorMetrics.clear();
+  }
+
+  @BeforeEach
+  public void init() throws Exception {
+    setUpHdfs(remotePath.getAbsolutePath());

Review Comment:
   Do we close the HDFS cluster?



##########
coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java:
##########
@@ -194,6 +211,83 @@ public void dynamicRemoteStorageTest() throws Exception {
     clientConfManager.close();
   }
 
+  @Test
+  public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
+    final int updateIntervalSec = 2;
+    final String remotePath1 = "hdfs://host1/path1";
+    final String remotePath2 = "hdfs://host2/path2";
+    final String remotePath3 = "hdfs://host3/path3";
+    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile();
+    cfgFile.deleteOnExit();
+    writeRemoteStorageConf(cfgFile, remotePath1);
+    createMiniHdfs(remotePath.getAbsolutePath());

Review Comment:
   Do we stop the HDFS cluster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963227608


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);

Review Comment:
   At first, I was worried that requesting NN too frequently would put pressure on NN, so I thought that the JVM process would be deleted only when it exited. But now I have tested it on the cluster. Using the `delete` operation, I can delete 100m of data every second. There is no pressure, so I will use the `delete` operation instead.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237642315

   Because some requirements of the current parameter configuration of the two clusters are different, the reading and writing speeds are not the same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237733070

   Could we implement the check HDFS logic  in AppBalanceSelectStorageStrategy?   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963203554


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);
+      long totalTime = System.currentTimeMillis() - startWrite;
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(
+              entry -> entry.getValue().getRatioValue().get())).collect(Collectors.toList());
+    } catch (Exception e) {
+      LOG.error("Failed to delete directory, "
+          + "we should compare the number of apps to select a remote path" + sizeList, e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * the strategy of pick remote storage is according to assignment count
+   */
+  @Override
+  public RemoteStorageInfo pickRemoteStorage(String appId) {
+    if (appIdToRemoteStorageInfo.containsKey(appId)) {
+      return appIdToRemoteStorageInfo.get(appId);
+    }
+
+    for (Map.Entry<String, RankValue> entry : sizeList) {
+      String storagePath = entry.getKey();
+      if (availableRemoteStorageInfo.containsKey(storagePath)) {
+        appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+        incRemoteStorageCounter(storagePath);
+        break;
+      }
+    }
+    return appIdToRemoteStorageInfo.get(appId);
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+    RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath);
+    if (counter != null) {
+      counter.getAppNum().incrementAndGet();
+    } else {
+      remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1));
+      // it may be happened when assignment remote storage
+      // and refresh remote storage at the same time
+      LOG.warn("Remote storage path lost during assignment: %s doesn't exist, "
+          + "reset the rank value to 0 and app size to 1.", remoteStoragePath);
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void decRemoteStorageCounter(String storagePath) {
+    if (!StringUtils.isEmpty(storagePath)) {
+      RankValue atomic = remoteStoragePathRankValue.get(storagePath);
+      if (atomic != null) {
+        double count = atomic.getAppNum().decrementAndGet();
+        if (count < 0) {
+          LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+              storagePath, count);
+          atomic.getAppNum().set(0);
+        }
+      } else {
+        remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1));
+        LOG.warn("Can't find counter for remote storage: {}", storagePath);
+      }
+
+      if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
+          && !availableRemoteStorageInfo.containsKey(storagePath)) {
+        remoteStoragePathRankValue.remove(storagePath);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void removePathFromCounter(String storagePath) {
+    RankValue rankValue = remoteStoragePathRankValue.get(storagePath);
+    // The size of the file cannot be used to determine whether the current path is still used by apps.
+    // Therefore, determine whether the HDFS path is still used by the number of apps
+    if (rankValue != null && rankValue.getAppNum().get() == 0) {
+      remoteStoragePathRankValue.remove(storagePath);
+    }
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+    return appIdToRemoteStorageInfo;
+  }
+
+  @Override
+  public Map<String, RankValue> getRemoteStoragePathRankValue() {
+    return remoteStoragePathRankValue;
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+    return availableRemoteStorageInfo;
+  }
+
+  @VisibleForTesting
+  public void setFs(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @VisibleForTesting
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  static class RankValue {
+    AtomicLong ratioValue;

Review Comment:
   What's the meaning of ratioValue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963207572


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.schedule.time")

Review Comment:
   `rss.coordinator.remote.storage.schedule.time` -> `rss.coordinator.health.remote.storage.schedule.time`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964467774


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   ![Uploading image.png…]()
   Although we are writing the same file, the stream of this file has been closed, and a batch of new DN should be reallocated next time we write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1239492099

   Thanks all. And thank you for your hard review. @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236505186

   > OK, but we should give the class another better name and add some comments to explain why we need this strategy.
   
   OK. I'll add some comments later and what do you suggest for the name of this class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236521027

   Thanks for your explanation, you and I are on the same page.@colinmjj Agree with you @jerqi , `HealthSelectStorageStrategy` seems be better than others. I will change it later,how about `AppNumSelectStorageStrategy` change to `AppBalanceSelectStorageStrategy`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233815537

   > Now the core of this PR is to enable multiple HDFS paths to have certain disaster tolerance capability. IIRC, the metrics of HDFS can only be obtained through the rest API, and it may not be very stable through HTTP. WDYT?
   
   Could we separate this pr into two pull requests? One is to refactor the interface, the other is to add the other strategy. 
   If we want to avoid the HDFS which was broken, I think we should a RemoteStorage HealthChecker.
   Why is  the http metrics unstable? If HDFS provide a http metrics, it only means that we can't access it frequently. If I am wrong, you can point out.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234094157

   > > I think there are many metrics that can directly reflect the pressure of the cluster.
   > 
   > There are indeed some metrics that can reflect the load of the cluster, but will the Read-write time be more critical for these shuffled temporary files.
   
   Em ... I can't agree with it.  Because read-write time is just a sample and you can't write much data, it's not accurate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234100953

   Just give my advice, you should verify the feature in your production environment. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234210668

   There are more DataNodes, the cluster can provide more IO capability, we should allocate more data to this cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233953744

   I think what you said is reasonable. I agree to divide into two PRS. However, at present, the load of the cluster can not be perfectly evaluated. However, the read-write time can directly compare the I / O speed of the cluster. Moreover, for HDFS metrics, there is no metric that can directly reflect the pressure of the cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237772919

   > Because for us, a large and faster cluster needs to write more data, instead of writing to two clusters evenly, and this only provides a strategy. If we want to modify the original policy, maybe we can add these logic in another PR?
   
   The measure of this strategy can't prove the cluster which you select is a larger and faster cluster. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235051814

   If you use the hdfs metrics system, which metric do you think is better to measure? The length of `callqueue` or the ratio of the load of dn to the number of dn


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235437969

   Yes, but the load of the HDFS metrics is the sum of xceiveCount of this DN, which also enables relatively accurate path selection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1231906496

   Before reviewing this PR, I have some questions
   1. How to disable this strategy in conf? If we only specify one path, there is no need to do storage selection, like specifying single s3a path.
   2. IO cost time maybe meaningless depending on the single w/r      time to construct the rank. But I think it’s ok to check whether the dfs namespace is healthy by this way
   3. Can u share some requirements on your production env. From my side, dfs can be as a big and stable enough store for storing shuffle data. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233991602

   Thanks for your reply @smallzhongfeng 
   > At present, we want to support multiple clusters, but the pressure of each cluster is different. We want to select the cluster with the least pressure to write data, instead of writing data to each cluster evenly. In fact, our understanding of configuring multiple HDFS paths can play a certain role in disaster recovery.
   
   Got your thought. In our internal company, we have a router filesystem to support fault tolerance between namespaces. So there is no such requirement.
   
   > Moreover, for HDFS metrics, there seems to be no metric that can directly reflect the pressure of the cluster.
   
   Can we implement a HDFS checker to check the w/r time whether to exceed the threshold?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234174895

   Do you have measure the performance of two clusters? Is the data written to them proportional to the number of nodes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234331061

   I think this comparison may be a bit unreasonable, because it may be related to the model, disk type, and some other factors, we can't say that a machine with 80 ssd disks is necessarily slower than a machine with 200 hdd disks. But the most direct way may be to read and write files, which can directly reflect the pressure of DataNodes at that time. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235348175

   Do you mean the xceiverCount? If DN use a fix size thread pool, thread number won't stand for the load of DN. It depends on the implement of DN.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234056257

   > It seems difficult to determine a reasonable threshold, so it may be better to directly compare the time consumption from different path. @zuston
   
   Time consumption actually is not meaningful. For example. one namespace cost 10s, but another namespace cost 10.1 s. 
   For my side, there is no difference on two namespaces' performance .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234058380

   > Time consumption actually is not meaningful. For example. one namespace cost 10s, but another namespace cost 10.1 s. For my side, there is no difference on two namespaces' performance .
   
   Yes, because of the instantaneous value obtained at present, I have replaced it with the mean value over a period of time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964479162


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   ![image](https://user-images.githubusercontent.com/84573424/188815077-11ef41e0-9b77-4569-bafc-93bc7319fa7e.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962598886


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -125,7 +125,21 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<String> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = ConfigOptions
+      .key("rss.coordinator.remote.storage.select.strategy")
+      .stringType()
+      .defaultValue("IO")

Review Comment:
   > Of course, you can let users choose by themselves, but I actually prefer the IO strategy, because if you use IO, you can avoid some risks better than the APP strategy.
   
   For our production environment, one HDFS can't process all the the requests of rss shuffle servers, so we need to allocate the application to multiple HDFS by app nums.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237637467

   > > The comparison of IO sample isn't meaningful. Do we remove it?
   > 
   > I want to say that the reading and writing time is of certain reference value. For us, the writing speed of large clusters is faster than that of small clusters.
   
   Em... just one time WR, why will the large clusters be faster than small clusters? They process the same request pressure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi merged pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi merged PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237598165

   Is `IOSampleSelectStorageStrategy` a better name? `HealthSelectStorageStrategy` is a little general.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964467774


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   Although we are writing the same file, the stream of this file has been closed, and a batch of new DN should be reallocated next time we write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1234053136

   > 
   
   If the speed of writing to one of the paths of the shuffle servers is collected, the other paths may never be written.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1235234246

   Maybe we can use the number of threads reading and writing files of DN to judge the load of the cluster. The general idea is as follows:
   ```
   DistributedFileSystem dfs = (DistributedFileSystem) fs;
   DatanodeStorageReport[] datanodeStorageReports =
           dfs.getClient().getDatanodeStorageReport(HdfsConstants.DatanodeReportType.LIVE);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1233799159

   Now the core of this PR is to enable multiple HDFS paths to have certain disaster tolerance capability. IIRC, the metrics of HDFS can only be obtained through the rest API, and it may not be very stable through HTTP. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1232664775

   Thank you for your questions. My answers are as follows:
   1. If the parameter `rss.coordinator.remote.storage.select.strategy` is set to `APP`, it will be the original selection strategy.
   2. The reason why this strategy is adopted at present is to prevent some exceptions in the HDFS path under different namespaces. We can avoid selecting this path.
   3. At present, we want to support multiple clusters, but the pressure of each cluster is different. We want to select the cluster with the least pressure to write data, instead of writing data to each cluster evenly. In fact, our understanding of configuring multiple HDFS paths can play a certain role in disaster recovery.
   @jerqi @zuston 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963237867


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.
+ * Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ * If there is an exception when getting the filesystem at the beginning,
+ * roll back to using AppBalanceSelectStorageStrategy.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    try {
+      if (remoteStoragePathRankValue.size() > 1) {
+        for (String path : remoteStoragePathRankValue.keySet()) {
+          Path remotePath = new Path(path);
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          Path testPath = new Path(path + "/rssTest");
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            byte[] data = RandomUtils.nextBytes(fileSize);
+
+            try (FSDataOutputStream fos = fs.create(testPath)) {
+              fos.write(data);
+              fos.flush();
+            }
+            byte[] readData = new byte[fileSize];
+            int readBytes;
+            try (FSDataInputStream fis = fs.open(testPath)) {
+              int hasReadBytes = 0;
+              do {
+                readBytes = fis.read(readData);
+                if (hasReadBytes < fileSize) {
+                  for (int i = 0; i < readBytes; i++) {
+                    if (data[hasReadBytes + i] != readData[i]) {
+                      RankValue rankValue = remoteStoragePathRankValue.get(path);
+                      remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+                    }
+                  }
+                }
+                hasReadBytes += readBytes;
+              } while (readBytes != -1);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error ", e);
+            RankValue rankValue = remoteStoragePathRankValue.get(path);
+            remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get()));
+          } finally {
+            sortPathByIORank(path, testPath, startWriteTime);
+          }
+        }
+      } else {
+        sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+      }
+    } catch (Exception e) {
+      LOG.error("Some error happened, compare the number of apps to select a remote path", e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByIORank(String path, Path testPath, long startWrite) {
+    try {
+      fs.deleteOnExit(testPath);
+      long totalTime = System.currentTimeMillis() - startWrite;
+      RankValue rankValue = remoteStoragePathRankValue.get(path);
+      remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get()));
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(
+              entry -> entry.getValue().getRatioValue().get())).collect(Collectors.toList());
+    } catch (Exception e) {
+      LOG.error("Failed to delete directory, "
+          + "we should compare the number of apps to select a remote path" + sizeList, e);
+      sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull)
+          .sorted(Comparator.comparingDouble(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * the strategy of pick remote storage is according to assignment count
+   */
+  @Override
+  public RemoteStorageInfo pickRemoteStorage(String appId) {
+    if (appIdToRemoteStorageInfo.containsKey(appId)) {
+      return appIdToRemoteStorageInfo.get(appId);
+    }
+
+    for (Map.Entry<String, RankValue> entry : sizeList) {
+      String storagePath = entry.getKey();
+      if (availableRemoteStorageInfo.containsKey(storagePath)) {
+        appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath));
+        incRemoteStorageCounter(storagePath);
+        break;
+      }
+    }
+    return appIdToRemoteStorageInfo.get(appId);
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
+    RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath);
+    if (counter != null) {
+      counter.getAppNum().incrementAndGet();
+    } else {
+      remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1));
+      // it may be happened when assignment remote storage
+      // and refresh remote storage at the same time
+      LOG.warn("Remote storage path lost during assignment: %s doesn't exist, "
+          + "reset the rank value to 0 and app size to 1.", remoteStoragePath);
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public synchronized void decRemoteStorageCounter(String storagePath) {
+    if (!StringUtils.isEmpty(storagePath)) {
+      RankValue atomic = remoteStoragePathRankValue.get(storagePath);
+      if (atomic != null) {
+        double count = atomic.getAppNum().decrementAndGet();
+        if (count < 0) {
+          LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0",
+              storagePath, count);
+          atomic.getAppNum().set(0);
+        }
+      } else {
+        remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1));
+        LOG.warn("Can't find counter for remote storage: {}", storagePath);
+      }
+
+      if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
+          && !availableRemoteStorageInfo.containsKey(storagePath)) {
+        remoteStoragePathRankValue.remove(storagePath);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void removePathFromCounter(String storagePath) {
+    RankValue rankValue = remoteStoragePathRankValue.get(storagePath);
+    // The size of the file cannot be used to determine whether the current path is still used by apps.
+    // Therefore, determine whether the HDFS path is still used by the number of apps
+    if (rankValue != null && rankValue.getAppNum().get() == 0) {
+      remoteStoragePathRankValue.remove(storagePath);
+    }
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAppIdToRemoteStorageInfo() {
+    return appIdToRemoteStorageInfo;
+  }
+
+  @Override
+  public Map<String, RankValue> getRemoteStoragePathRankValue() {
+    return remoteStoragePathRankValue;
+  }
+
+  @Override
+  public Map<String, RemoteStorageInfo> getAvailableRemoteStorageInfo() {
+    return availableRemoteStorageInfo;
+  }
+
+  @VisibleForTesting
+  public void setFs(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @VisibleForTesting
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  static class RankValue {
+    AtomicLong ratioValue;

Review Comment:
   Changed the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236114668

   I can understand what you mean, if it is the service of the shuffle server, load balancing is very critical, otherwise the load of the server will increase, but we are currently only writing the temporary data of the shuffle, and only the dn node of hdfs is under pressure. I don't think the load of dn needs to be compared with the pressure of shuffle server. Even if the data of all apps are written to the same path, there will not be too much pressure on dn. We just want to choose a faster cluster to write data. And when the load of dn is very high, it will also affect the time to read and write data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236510160

   > > OK, but we should give the class another better name and add some comments to explain why we need this strategy.
   > 
   > OK. I'll add some comments later and what do you suggest for the name of this class.
   
   `AvailabilitySelectStorageStrategy` or `HealthSelectStorageStrategy`, but I don't these names are good enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1236770081

   > Thanks for your explanation, you and I are on the same page.@colinmjj Agree with you @jerqi , `HealthSelectStorageStrategy` seems be better than others. I will change it later,what about `AppNumSelectStorageStrategy` change to `AppBalanceSelectStorageStrategy`.
   
   `AppBalanceSelectStorageStrategy` is ok for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237628365

   > The comparison of IO sample isn't meaningful. Do we remove it?
   
   I want to say that the reading and writing time is of certain reference value. For us, the writing speed of large clusters is faster than that of small clusters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1239293617

   > ### What changes were proposed in this pull request?
   > Solve #186
   > 
   > ### Why are the changes needed?
   > A better strategy to select remote path.
   > 
   > ### Does this PR introduce _any_ user-facing change?
   > Three config added.
   > 
   > 1. `rss.coordinator.remote.storage.select.strategy` support `APP` and `IO`, `APP` selection strategy based on the number of apps, `IO` selection strategy based on time consumption of reading and writing files.
   > 2. `rss.coordinator.remote.storage.schedule.time` , if user choose `IO`, file will be read and written at regular intervals.
   > 3. `rss.coordinator.remote.storage.file.size` , the size of each read / write file.
   > 
   > ### How was this patch tested?
   > Added ut.
   
   Please update the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r962944698


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write data faster should be allocated as much as possible.

Review Comment:
   OK, it should be said that the more stable path will be more accurate. right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1237611328

   The comparison of  IO sample isn't meaningful. Do we remove it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r963228880


##########
coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java:
##########
@@ -194,6 +211,83 @@ public void dynamicRemoteStorageTest() throws Exception {
     clientConfManager.close();
   }
 
+  @Test
+  public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
+    final int updateIntervalSec = 2;
+    final String remotePath1 = "hdfs://host1/path1";
+    final String remotePath2 = "hdfs://host2/path2";
+    final String remotePath3 = "hdfs://host3/path3";
+    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile();
+    cfgFile.deleteOnExit();
+    writeRemoteStorageConf(cfgFile, remotePath1);
+    createMiniHdfs(remotePath.getAbsolutePath());

Review Comment:
   Yes, this needs to be turned off.



##########
coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java:
##########
@@ -194,6 +211,83 @@ public void dynamicRemoteStorageTest() throws Exception {
     clientConfManager.close();
   }
 
+  @Test
+  public void dynamicRemoteByHealthStrategyStorageTest() throws Exception {
+    final int updateIntervalSec = 2;
+    final String remotePath1 = "hdfs://host1/path1";
+    final String remotePath2 = "hdfs://host2/path2";
+    final String remotePath3 = "hdfs://host3/path3";
+    File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile();
+    cfgFile.deleteOnExit();
+    writeRemoteStorageConf(cfgFile, remotePath1);
+    createMiniHdfs(remotePath.getAbsolutePath());

Review Comment:
   Yes, this needs to be closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
smallzhongfeng commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964474962


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/HealthSelectStorageStrategy.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * HealthSelectStorageStrategy considers that when allocating apps to different remote paths,
+ * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster.
+ * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster.
+ */
+public class HealthSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HealthSelectStorageStrategy.class);
+  /**
+   * store appId -> remote path to make sure all shuffle data of the same application
+   * will be written to the same remote storage
+   */
+  private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+  /**
+   * store remote path -> application count for assignment strategy
+   */
+  private final Map<String, RankValue> remoteStoragePathRankValue;
+  private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+  private List<Map.Entry<String, RankValue>> sizeList;
+  private FileSystem fs;
+  private Configuration conf;
+  private final int fileSize;
+  private final int readAndWriteTimes;
+
+  public HealthSelectStorageStrategy(CoordinatorConf cf) {
+    conf = new Configuration();
+    fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_FILE_SIZE);
+    readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES);
+    this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+    this.remoteStoragePathRankValue = Maps.newConcurrentMap();
+    this.availableRemoteStorageInfo = Maps.newHashMap();
+    this.sizeList = Lists.newCopyOnWriteArrayList();
+    ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+    // should init later than the refreshRemoteStorage init
+    readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+        cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  public void checkReadAndWrite() {
+    if (remoteStoragePathRankValue.size() > 1) {
+      for (String path : remoteStoragePathRankValue.keySet()) {
+        Path remotePath = new Path(path);
+        Path testPath = new Path(path + "/rssTest");
+        long startWriteTime = System.currentTimeMillis();
+        try {
+          fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
+          for (int j = 0; j < readAndWriteTimes; j++) {

Review Comment:
   ![image](https://user-images.githubusercontent.com/84573424/188814033-d61b5820-db6e-40cc-824d-f2e46b03268c.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#discussion_r964750605


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")

Review Comment:
   io sample



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_FILE_SIZE = ConfigOptions
+      .key("rss.coordinator.remote.storage.file.size")

Review Comment:
   io sample



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_FILE_SIZE = ConfigOptions
+      .key("rss.coordinator.remote.storage.file.size")
+      .intType()
+      .defaultValue(204800 * 1000)
+      .withDescription("The size of the file that the scheduled thread reads and writes");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES = ConfigOptions
+      .key("rss.coordinator.remote.storage.access.times")
+      .intType()
+      .defaultValue(3)
+      .withDescription("The number of times to read and write HDFS files");

Review Comment:
   Could we add the new configuration option to the documents?



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java:
##########
@@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf {
       .stringType()
       .noDefaultValue()
       .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'");
-
+  public static final ConfigOption<ApplicationManager.StrategyName> COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY =
+      ConfigOptions.key("rss.coordinator.remote.storage.select.strategy")
+      .enumType(ApplicationManager.StrategyName.class)
+      .defaultValue(APP_BALANCE)
+      .withDescription("Strategy for selecting the remote path");
+  public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
+      .key("rss.coordinator.remote.storage.health.schedule.time")
+      .longType()
+      .defaultValue(60 * 1000L)
+      .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_FILE_SIZE = ConfigOptions
+      .key("rss.coordinator.remote.storage.file.size")
+      .intType()
+      .defaultValue(204800 * 1000)
+      .withDescription("The size of the file that the scheduled thread reads and writes");
+  public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES = ConfigOptions
+      .key("rss.coordinator.remote.storage.access.times")

Review Comment:
   io sample



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #192: [ISSUE-186][Feature] Use I/O cost time to select storage paths

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #192:
URL: https://github.com/apache/incubator-uniffle/pull/192#issuecomment-1231404947

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/192?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#192](https://codecov.io/gh/apache/incubator-uniffle/pull/192?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (83fbe69) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/e1b9b80e506eb5079c16cbb71096896f2f647b84?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e1b9b80) will **increase** coverage by `0.53%`.
   > The diff coverage is `86.97%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #192      +/-   ##
   ============================================
   + Coverage     58.45%   58.99%   +0.53%     
   - Complexity     1272     1309      +37     
   ============================================
     Files           158      160       +2     
     Lines          8437     8596     +159     
     Branches        782      798      +16     
   ============================================
   + Hits           4932     5071     +139     
   - Misses         3254     3266      +12     
   - Partials        251      259       +8     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/192?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...iffle/coordinator/AppNumSelectStorageStrategy.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQXBwTnVtU2VsZWN0U3RvcmFnZVN0cmF0ZWd5LmphdmE=) | `72.34% <72.34%> (ø)` | |
   | [...apache/uniffle/coordinator/ApplicationManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQXBwbGljYXRpb25NYW5hZ2VyLmphdmE=) | `90.17% <89.47%> (+6.47%)` | :arrow_up: |
   | [...iffle/coordinator/IOCostSelectStorageStrategy.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvSU9Db3N0U2VsZWN0U3RvcmFnZVN0cmF0ZWd5LmphdmE=) | `90.99% <90.99%> (ø)` | |
   | [...rg/apache/uniffle/coordinator/CoordinatorConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29vcmRpbmF0b3Ivc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3VuaWZmbGUvY29vcmRpbmF0b3IvQ29vcmRpbmF0b3JDb25mLmphdmE=) | `96.82% <100.00%> (+0.42%)` | :arrow_up: |
   | [...org/apache/uniffle/server/LocalStorageChecker.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9Mb2NhbFN0b3JhZ2VDaGVja2VyLmphdmE=) | `67.74% <0.00%> (-1.83%)` | :arrow_down: |
   | [...rg/apache/uniffle/storage/common/LocalStorage.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2NvbW1vbi9Mb2NhbFN0b3JhZ2UuamF2YQ==) | `43.66% <0.00%> (-1.02%)` | :arrow_down: |
   | [...org/apache/uniffle/server/ShuffleFlushManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlRmx1c2hNYW5hZ2VyLmphdmE=) | `78.33% <0.00%> (-1.00%)` | :arrow_down: |
   | [...java/org/apache/uniffle/common/config/RssConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb25maWcvUnNzQ29uZi5qYXZh) | `28.47% <0.00%> (-0.50%)` | :arrow_down: |
   | [.../org/apache/uniffle/common/util/UnitConverter.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi91dGlsL1VuaXRDb252ZXJ0ZXIuamF2YQ==) | `47.36% <0.00%> (+0.22%)` | :arrow_up: |
   | [...he/uniffle/server/buffer/ShuffleBufferManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9idWZmZXIvU2h1ZmZsZUJ1ZmZlck1hbmFnZXIuamF2YQ==) | `83.19% <0.00%> (+0.42%)` | :arrow_up: |
   | ... and [1 more](https://codecov.io/gh/apache/incubator-uniffle/pull/192/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org