You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/05/11 00:07:18 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1190489402


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    StorageSlot[] slots = new StorageSlot[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      final File slotDirectory = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+      slots[i] = new StorageSlot(slotDirectory, sizePerSlot);
+    }

Review Comment:
   ??  We *do* expect more slots than storage directories.  I don't expect more storage directories than slots, but if that's true, it just means that some of those storage locations won't be used.  That's a great test case to cover.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org