You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "advancedxy (via GitHub)" <gi...@apache.org> on 2023/03/05 12:45:02 UTC

[GitHub] [incubator-uniffle] advancedxy commented on a diff in pull request #680: [#678] improvement: Write hdfs files asynchronously when `detectStorage`

advancedxy commented on code in PR #680:
URL: https://github.com/apache/incubator-uniffle/pull/680#discussion_r1125659829


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/SelectStorageStrategy.java:
##########
@@ -17,11 +17,15 @@
 
 package org.apache.uniffle.coordinator.strategy.storage;
 
+import java.util.Comparator;
+
 import org.apache.uniffle.common.RemoteStorageInfo;
 
 public interface SelectStorageStrategy {
 
   void detectStorage();
 
   RemoteStorageInfo pickStorage(String appId);
+
+  Comparator getComparator();

Review Comment:
   I believe the getComparator is more appropriate to be in the abstract class.



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java:
##########
@@ -76,6 +102,72 @@ public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
     }
   }
 
+  @Override
+  public void detectStorage() {
+    uris = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+    if (remoteStoragePathRankValue.size() > 1) {
+      CountDownLatch countDownLatch = new CountDownLatch(uris.size());
+      uris.parallelStream().forEach(uri -> {
+        if (uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
+          Path remotePath = new Path(uri.getKey());
+          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId()
+              + Thread.currentThread().getName();
+          Path testPath = new Path(rssTest);
+          RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
+          rankValue.setHealthy(new AtomicBoolean(true));
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, hdfsConf);
+            for (int j = 0; j < readAndWriteTimes; j++) {
+              readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error, we will not use this remote path {}.", uri, e);
+            rankValue.setHealthy(new AtomicBoolean(false));
+          } finally {
+            sortPathByRankValue(uri.getKey(), rssTest, startWriteTime);
+          }
+          countDownLatch.countDown();
+        }
+      });
+      try {
+        countDownLatch.await();
+      } catch (InterruptedException e) {
+        LOG.error("Failed to detectStorage!");
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByRankValue(
+      String path, String testPath, long startWrite) {
+    RankValue rankValue = remoteStoragePathRankValue.get(path);
+    try {
+      FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), hdfsConf);
+      fs.delete(new Path(testPath), true);
+      if (rankValue.getHealthy().get()) {
+        rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() - startWrite));
+      }
+    } catch (Exception e) {
+      rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
+      LOG.error("Failed to sort, we will not use this remote path {}.", path, e);
+    }
+
+    if (this.getComparator() != null) {
+      uris = Lists.newCopyOnWriteArrayList(
+          remoteStoragePathRankValue.entrySet()).stream().filter(
+          Objects::nonNull).sorted(this.getComparator()).collect(Collectors.toList());
+    } else {
+      uris = Lists.newCopyOnWriteArrayList(
+          remoteStoragePathRankValue.entrySet()).stream().filter(
+          Objects::nonNull).collect(Collectors.toList());
+    }
+    LOG.info("The sorted remote path list is: {}", uris);
+  }
+
+  @Override
+  public abstract Comparator<Map.Entry<String, RankValue>> getComparator();

Review Comment:
   Some javadoc please



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java:
##########
@@ -18,36 +18,62 @@
 package org.apache.uniffle.coordinator.strategy.storage;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 /**
  * This is a simple implementation class, which provides some methods to check whether the path is normal
  */
 public abstract class AbstractSelectStorageStrategy implements SelectStorageStrategy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractSelectStorageStrategy.class);
   /**
    * store remote path -> application count for assignment strategy
    */
   protected final Map<String, RankValue> remoteStoragePathRankValue;
   protected final int fileSize;
   private final String coordinatorId;
+  private final Configuration hdfsConf;
+  protected List<Map.Entry<String, RankValue>> uris;
+  private int readAndWriteTimes = 1;
 
   public AbstractSelectStorageStrategy(
       Map<String, RankValue> remoteStoragePathRankValue,
       CoordinatorConf conf) {
     this.remoteStoragePathRankValue = remoteStoragePathRankValue;
-    fileSize = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+    this.hdfsConf = new Configuration();
+    this.fileSize = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
     this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID, UUID.randomUUID().toString());
+    ApplicationManager.StrategyName strategyName = conf.get(
+        CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY);
+    if (strategyName == ApplicationManager.StrategyName.IO_SAMPLE) {

Review Comment:
   This is weird.....
   
   I believe a better way would be:
   1. define a new method: `protected int readAndWriteTimes(Coordinator conf)`
   2. override it in the `LowestIOSampleCostSelectStorageStrategy.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org