You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/05/10 06:38:11 UTC

[GitHub] [druid] imply-cheddar opened a new pull request, #14239: Be able to load segments on Peons

imply-cheddar opened a new pull request, #14239:
URL: https://github.com/apache/druid/pull/14239

   This change introduces a new config on WorkerConfig that indicates how many bytes of each storage
   location to use for storage of a task.  Said config is divided up amongst the locations and slots
   and then used to set TaskConfig.tmpStorageBytesPerTask
   
   The Peons use their local task dir and
   tmpStorageBytesPerTask as their StorageLocations for the SegmentManager such that they can accept broadcast segments.
   
   TODO: this change requires docs updates before merge.  I'd like to get a sense for the tests and stuff first.
   
   #### Release note
   
   Peons now have the ability to store and work with segments broadcast to them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1189674946


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    StorageSlot[] slots = new StorageSlot[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      final File slotDirectory = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+      slots[i] = new StorageSlot(slotDirectory, sizePerSlot);
+    }

Review Comment:
   It will be useful to clarify that we don't expect more slots than the number of base directories. For such a configuration, some base directories will remain unused since a task is allowed to use at most one directory. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;

Review Comment:
   all directories might not be of same size though? it's ok we enforce/assume that. something to call out in the docs PR that the same amount space is reserved from all base directories.   



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    StorageSlot[] slots = new StorageSlot[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      final File slotDirectory = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+      slots[i] = new StorageSlot(slotDirectory, sizePerSlot);
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slots);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  public TaskStorageDirTracker(List<File> baseTaskDirs, StorageSlot[] slots)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = slots;
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              slot
           );
         }
       }
     }
   }
 
-  public File pickBaseDir(String taskId)
+  public synchronized StorageSlot pickStorageSlot(String taskId)
   {
-    if (baseTaskDirs.length == 1) {
-      return baseTaskDirs[0];
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (StorageSlot slot : slots) {
+      if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+        return slot;
+      }
     }
 
-    // if the task directory already exists, we want to give it precedence, so check.
-    for (File baseTaskDir : baseTaskDirs) {
-      if (new File(baseTaskDir, taskId).exists()) {
-        return baseTaskDir;
+    // if it doesn't exist, pick one round-robin and ensure it is unused.
+    for (int i = 0; i < slots.length; ++i) {
+      // This can be negative, so abs() it.
+      final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % slots.length);
+      final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+      if (candidateSlot.runningTaskId != null) {
+        continue;
       }
+      candidateSlot.runningTaskId = taskId;
+      return candidateSlot;
     }
+    throw new ISE("Unable to pick a free slot, this should never happen, slot status [%s].", Arrays.toString(slots));
+  }
 
-    // if it doesn't exist, pick one round-robin and return.  This can be negative, so abs() it
-    final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
-    return baseTaskDirs[currIncrement % baseTaskDirs.length];
+  public synchronized void returnStorageSlot(StorageSlot slot)
+  {
+    slot.runningTaskId = null;
   }
 
   @Nullable
-  public File findExistingTaskDir(String taskId)
+  public synchronized File findExistingTaskDir(String taskId)
   {
+    File candidateLocation = null;
     if (baseTaskDirs.length == 1) {
-      return new File(baseTaskDirs[0], taskId);
+      candidateLocation = new File(baseTaskDirs[0], taskId);
+    } else {
+      for (File baseTaskDir : baseTaskDirs) {
+        File maybeExists = new File(baseTaskDir, taskId);
+        if (maybeExists.exists()) {
+          candidateLocation = maybeExists;
+          break;
+        }
+      }
     }
 
-    for (File baseTaskDir : baseTaskDirs) {
-      final File candidateLocation = new File(baseTaskDir, taskId);
-      if (candidateLocation.exists()) {
-        return candidateLocation;
+    if (candidateLocation != null && candidateLocation.exists()) {
+      // task exists at old location, relocate to a "good" slot location and return that.
+      final StorageSlot taskSlot = pickStorageSlot(taskId);
+      final File pickedLocation = new File(taskSlot.getDirectory(), taskId);
+      if (candidateLocation.renameTo(pickedLocation)) {
+        taskSlot.runningTaskId = taskId;
+        return pickedLocation;
+      } else {
+        throw new ISE("Unable to relocate task ([%s] -> [%s])", candidateLocation, pickedLocation);

Review Comment:
   should the slot be freed that was picked up? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1189456536


##########
server/src/test/java/org/apache/druid/server/log/LoggingRequestLoggerProviderTest.java:
##########
@@ -77,7 +77,7 @@
     final Properties properties = new Properties();
     properties.put(propertyPrefix + ".type", "noop");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    Assert.assertThat(provider.get().get().get(), Matchers.instanceOf(NoopRequestLogger.class));
+    Assert.assertThat(provider.get().get(), Matchers.instanceOf(NoopRequestLogger.class));

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Assert.assertThat](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4934)



##########
server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java:
##########
@@ -126,7 +126,7 @@
   {
     properties.put(PROPERTY_PREFIX + ".numBackgroundThreads", "BABBA YAGA");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();

Review Comment:
   ## Unread local variable
   
   Variable 'CacheConfig config' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4935)



##########
server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java:
##########
@@ -154,7 +154,7 @@
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "FaLse");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();

Review Comment:
   ## Unread local variable
   
   Variable 'CacheConfig config' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4938)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java:
##########
@@ -167,7 +167,7 @@
                             );
 
                           }
-                          final File taskDir = new File(baseDirForTask, task.getId());
+                          final File taskDir = new File(storageSlot.getDirectory(), task.getId());

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4940)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -154,20 +153,19 @@
                 @Override
                 public TaskStatus call()
                 {
-
-                  final File baseDirForTask;
+                  final TaskStorageDirTracker.StorageSlot storageSlot;
                   try {
-                    baseDirForTask = getTracker().pickBaseDir(task.getId());
+                    storageSlot = getTracker().pickStorageSlot(task.getId());
                   }
                   catch (RuntimeException e) {
-                    LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
+                    LOG.warn(e, "Failed to get storage slot for task [%s], cannot schedule.", task.getId());
                     return TaskStatus.failure(
                         task.getId(),
-                        StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
+                        StringUtils.format("Failed to get storage slot due to error [%s]", e.getMessage())
                     );
                   }
 
-                  final File taskDir = new File(baseDirForTask, task.getId());
+                  final File taskDir = new File(storageSlot.getDirectory(), task.getId());

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4939)



##########
server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java:
##########
@@ -144,7 +144,7 @@
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "FALSE");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();

Review Comment:
   ## Unread local variable
   
   Variable 'CacheConfig config' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4937)



##########
server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java:
##########
@@ -135,7 +135,7 @@
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "TRUE");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();

Review Comment:
   ## Unread local variable
   
   Variable 'CacheConfig config' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4936)



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [workerConfig](1) may be null at this access as suggested by [this](2) null guard.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4941)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192095295


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -304,82 +290,48 @@ public TaskStatus call()
                         if (context != null) {
                           for (String propName : context.keySet()) {
                             if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                              command.add(
-                                  StringUtils.format(
-                                  "-D%s=%s",
+                              command.addSystemProperty(
                                   propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                   task.getContextValue(propName)
-                                )
                               );
                             }
                           }
                         }
 
                         // add the attemptId as a system property
-                        command.add(
-                            StringUtils.format(
-                                "-D%s=%s",
-                                "attemptId",
-                                "1"
-                            )
-                        );
+                        command.addSystemProperty("attemptId", "1");
 
                         // Add dataSource, taskId and taskType for metrics or logging
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.DATASOURCE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE,
                             task.getDataSource()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_ID,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID,
                             task.getId()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_TYPE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
                             task.getType()
-                          )
                         );
 
-                        command.add(StringUtils.format("-Ddruid.host=%s", childHost));
-                        command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
-                        command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
+
+                        command.addSystemProperty("druid.host", childHost);
+                        command.addSystemProperty("druid.plaintextPort", childPort);
+                        command.addSystemProperty("druid.tlsPort", tlsChildPort);
 
                         // Let tasks know where they are running on.
                         // This information is used in native parallel indexing with shuffle.
-                        command.add(StringUtils.format("-Ddruid.task.executor.service=%s", node.getServiceName()));
-                        command.add(StringUtils.format("-Ddruid.task.executor.host=%s", node.getHost()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.plaintextPort=%d", node.getPlaintextPort())
-                        );
-                        command.add(
-                            StringUtils.format(
-                                "-Ddruid.task.executor.enablePlaintextPort=%s",
-                                node.isEnablePlaintextPort()
-                            )
-                        );
-                        command.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d", node.getTlsPort()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
-                        );
-                        command.add(StringUtils.format("-Dlog4j2.configurationFactory=%s", ConsoleLoggingEnforcementConfigurationFactory.class.getName()));
+                        command.addSystemProperty("druid.task.executor.service", node.getServiceName());
+                        command.addSystemProperty("druid.task.executor.host", node.getHost());
+                        command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort());
+                        command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort());
+                        command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
+                        command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort());
+                        command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName());
 
-                        // These are not enabled per default to allow the user to either set or not set them
-                        // Users are highly suggested to be set in druid.indexer.runner.javaOpts
-                        // See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
-                        // for more information
-                        // command.add("-XX:+UseThreadPriorities");
-                        // command.add("-XX:ThreadPriorityPolicy=42");

Review Comment:
   They were commented out.  commented out code is pointless, it just generates questions of "is this necessary" and then sits there from release to release because nobody knows why it's there or when it can be uncommented.  I cleaned it up.  If it's necessary, they can be added and not commented out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1191964565


##########
docs/ingestion/tasks.md:
##########
@@ -419,6 +419,19 @@ You can configure retention periods for logs in milliseconds by setting `druid.i
 
 > Automatic log file deletion typically works based on the log file's 'modified' timestamp in the back-end store.  Large clock skews between Druid processes and the long-term store might result in unintended behavior.
 
+## Configuring task storage sizes
+
+Tasks sometimes need to use local disk for storage of things while the task is active.  For example, for realtime ingestion tasks to accept broadcast segments for broadcast joins.  Or intermediate data sets for Multi-stage Query jobs
+
+Task storage sizes are configured through a combination of three properties:
+1. `druid.worker.capacity` - i.e. the "number of task slots"
+2. `druid.worker.baseTaskDirs` - i.e. the list directories to use for task storage 

Review Comment:
   ```suggestion
   2. `druid.worker.baseTaskDirs` - i.e. the list of directories to use for task storage. 
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
+  private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    final List<String> basePaths = workerConfig.getBaseTaskDirs();
+    final List<File> baseTaskDirs;
+
+    if (basePaths == null) {
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
-      final List<String> basePaths = workerConfig.getBaseTaskDirs();
-      if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
-      }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
+      baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
+    }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+    if (slotsPerBaseTaskDir == 0) {
+      slotsPerBaseTaskDir = 1;
+    } else if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
     }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    File[] slotDirs = new File[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      slotDirs[i] = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long sizePerSlot)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = new StorageSlot[slotDirs.length];
+    for (int i = 0; i < slotDirs.length; ++i) {
+      slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+    }
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",

Review Comment:
   Hmm. It's not necessary for the directory to exist beforehand since we are creating it at JVM startup. The error message should just say that it could not create the directory and user should have the relevant permission.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
+  private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    final List<String> basePaths = workerConfig.getBaseTaskDirs();
+    final List<File> baseTaskDirs;
+
+    if (basePaths == null) {
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
-      final List<String> basePaths = workerConfig.getBaseTaskDirs();
-      if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
-      }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
+      baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
+    }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+    if (slotsPerBaseTaskDir == 0) {
+      slotsPerBaseTaskDir = 1;
+    } else if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
     }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    File[] slotDirs = new File[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      slotDirs[i] = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long sizePerSlot)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = new StorageSlot[slotDirs.length];
+    for (int i = 0; i < slotDirs.length; ++i) {
+      slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+    }
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              slot
           );
         }
       }
     }
   }
 
-  public File pickBaseDir(String taskId)
+  public synchronized StorageSlot pickStorageSlot(String taskId)
   {
-    if (baseTaskDirs.length == 1) {
-      return baseTaskDirs[0];
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (StorageSlot slot : slots) {
+      if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+        return slot;
+      }
     }
 
-    // if the task directory already exists, we want to give it precedence, so check.
-    for (File baseTaskDir : baseTaskDirs) {
-      if (new File(baseTaskDir, taskId).exists()) {
-        return baseTaskDir;
+    // if it doesn't exist, pick one round-robin and ensure it is unused.
+    for (int i = 0; i < slots.length; ++i) {
+      // This can be negative, so abs() it.
+      final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % slots.length);
+      final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+      if (candidateSlot.runningTaskId == null) {
+        candidateSlot.runningTaskId = taskId;
+        return candidateSlot;
       }
     }
+    throw new ISE("Unable to pick a free slot, this should never happen, slot status [%s].", Arrays.toString(slots));
+  }
 
-    // if it doesn't exist, pick one round-robin and return.  This can be negative, so abs() it
-    final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
-    return baseTaskDirs[currIncrement % baseTaskDirs.length];
+  public synchronized void returnStorageSlot(StorageSlot slot)
+  {
+    if (slot.getParentRef() == this) {

Review Comment:
   interesting. can there be many TaskStorageDirTracker in one JVM? 



##########
server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java:
##########
@@ -80,11 +83,35 @@ public int getCapacity()
     return capacity;
   }
 
+  public WorkerConfig setCapacity(int capacity)
+  {
+    this.capacity = capacity;
+    return this;
+  }
+
+  public long getBaseTaskDirSize()
+  {
+    return baseTaskDirSize;
+  }
+
+  public WorkerConfig setBaseTaskDirSize(long baseTaskDirSize)
+  {
+    this.baseTaskDirSize = baseTaskDirSize;
+    return this;
+  }
+
   public List<String> getBaseTaskDirs()
   {
     return baseTaskDirs;
   }
 
+  public WorkerConfig setBaseTaskDirs(List<String> baseTaskDirs)

Review Comment:
   I haven't seen setters in config classes in the druid code before. But then I also don't know if it's intentional. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;

Review Comment:
   If we have 
    three directories - A, B, C
    dir size - 200 GB
    10 workers
    
    we get slotsPerBaseDir - 4
    sizePerSlot - 50 GB
    
    so there will be 100 GB left unused. is that calculation correct? 
    
    I think it's ok. But maybe we can call out this nuance in the docs? Users can always adjust the worker capacity or volume count if they want the whole space to be used. 
    



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java:
##########
@@ -258,6 +265,8 @@ public TaskStatus call()
                                 }
                               }
 
+                              getTracker().returnStorageSlot(storageSlot);

Review Comment:
   Similar comment here re sequencing. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -924,5 +870,50 @@ static int getNextAttemptID(File taskDir)
     }
     return maxAttempt + 1;
   }
+
+  public static class CommandListBuilder

Review Comment:
   Neat 👍 
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -477,6 +429,8 @@ public TaskStatus call()
                         portFinder.markPortUnused(tlsChildPort);
                       }
 
+                      getTracker().returnStorageSlot(storageSlot);

Review Comment:
   shouldn't you delete the directories first before returning the slot? Similar to how it's happening in BaseRestorableTaskRunner. In that class, if deleting directory fails for any reason, the slot will not get unreserved. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -304,82 +290,48 @@ public TaskStatus call()
                         if (context != null) {
                           for (String propName : context.keySet()) {
                             if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                              command.add(
-                                  StringUtils.format(
-                                  "-D%s=%s",
+                              command.addSystemProperty(
                                   propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                   task.getContextValue(propName)
-                                )
                               );
                             }
                           }
                         }
 
                         // add the attemptId as a system property
-                        command.add(
-                            StringUtils.format(
-                                "-D%s=%s",
-                                "attemptId",
-                                "1"
-                            )
-                        );
+                        command.addSystemProperty("attemptId", "1");
 
                         // Add dataSource, taskId and taskType for metrics or logging
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.DATASOURCE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE,
                             task.getDataSource()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_ID,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID,
                             task.getId()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_TYPE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
                             task.getType()
-                          )
                         );
 
-                        command.add(StringUtils.format("-Ddruid.host=%s", childHost));
-                        command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
-                        command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
+
+                        command.addSystemProperty("druid.host", childHost);
+                        command.addSystemProperty("druid.plaintextPort", childPort);
+                        command.addSystemProperty("druid.tlsPort", tlsChildPort);
 
                         // Let tasks know where they are running on.
                         // This information is used in native parallel indexing with shuffle.
-                        command.add(StringUtils.format("-Ddruid.task.executor.service=%s", node.getServiceName()));
-                        command.add(StringUtils.format("-Ddruid.task.executor.host=%s", node.getHost()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.plaintextPort=%d", node.getPlaintextPort())
-                        );
-                        command.add(
-                            StringUtils.format(
-                                "-Ddruid.task.executor.enablePlaintextPort=%s",
-                                node.isEnablePlaintextPort()
-                            )
-                        );
-                        command.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d", node.getTlsPort()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
-                        );
-                        command.add(StringUtils.format("-Dlog4j2.configurationFactory=%s", ConsoleLoggingEnforcementConfigurationFactory.class.getName()));
+                        command.addSystemProperty("druid.task.executor.service", node.getServiceName());
+                        command.addSystemProperty("druid.task.executor.host", node.getHost());
+                        command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort());
+                        command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort());
+                        command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
+                        command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort());
+                        command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName());
 
-                        // These are not enabled per default to allow the user to either set or not set them
-                        // Users are highly suggested to be set in druid.indexer.runner.javaOpts
-                        // See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
-                        // for more information
-                        // command.add("-XX:+UseThreadPriorities");
-                        // command.add("-XX:ThreadPriorityPolicy=42");

Review Comment:
   are these not required anymore? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1190490466


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;

Review Comment:
   Yes, this config is explicitly forcing symmetry across the locations.  It's a divergence from the historical "storage location" which allows for non-symmetrical stuff.  Forcing symmetry makes it much easier to divide up the sizes given, where if they are not symmetrical and someone has location A with size 200GB, location B with size 350GB and location C with 1TB and has 8 worker slots, what's the correct size to give to each of those tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192096101


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
+  private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    final List<String> basePaths = workerConfig.getBaseTaskDirs();
+    final List<File> baseTaskDirs;
+
+    if (basePaths == null) {
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
-      final List<String> basePaths = workerConfig.getBaseTaskDirs();
-      if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
-      }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
+      baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
+    }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+    if (slotsPerBaseTaskDir == 0) {
+      slotsPerBaseTaskDir = 1;
+    } else if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
     }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    File[] slotDirs = new File[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      slotDirs[i] = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long sizePerSlot)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = new StorageSlot[slotDirs.length];
+    for (int i = 0; i < slotDirs.length; ++i) {
+      slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+    }
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              slot
           );
         }
       }
     }
   }
 
-  public File pickBaseDir(String taskId)
+  public synchronized StorageSlot pickStorageSlot(String taskId)
   {
-    if (baseTaskDirs.length == 1) {
-      return baseTaskDirs[0];
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (StorageSlot slot : slots) {
+      if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+        return slot;
+      }
     }
 
-    // if the task directory already exists, we want to give it precedence, so check.
-    for (File baseTaskDir : baseTaskDirs) {
-      if (new File(baseTaskDir, taskId).exists()) {
-        return baseTaskDir;
+    // if it doesn't exist, pick one round-robin and ensure it is unused.
+    for (int i = 0; i < slots.length; ++i) {
+      // This can be negative, so abs() it.
+      final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % slots.length);
+      final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+      if (candidateSlot.runningTaskId == null) {
+        candidateSlot.runningTaskId = taskId;
+        return candidateSlot;
       }
     }
+    throw new ISE("Unable to pick a free slot, this should never happen, slot status [%s].", Arrays.toString(slots));
+  }
 
-    // if it doesn't exist, pick one round-robin and return.  This can be negative, so abs() it
-    final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
-    return baseTaskDirs[currIncrement % baseTaskDirs.length];
+  public synchronized void returnStorageSlot(StorageSlot slot)
+  {
+    if (slot.getParentRef() == this) {

Review Comment:
   There were in my tests and I had a test that passed when I "returned" a slot from one tracker to another one.  I felt like adding this extra check seemed nice and not overly complex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1190490704


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    StorageSlot[] slots = new StorageSlot[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      final File slotDirectory = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+      slots[i] = new StorageSlot(slotDirectory, sizePerSlot);
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slots);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  public TaskStorageDirTracker(List<File> baseTaskDirs, StorageSlot[] slots)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = slots;
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              slot
           );
         }
       }
     }
   }
 
-  public File pickBaseDir(String taskId)
+  public synchronized StorageSlot pickStorageSlot(String taskId)
   {
-    if (baseTaskDirs.length == 1) {
-      return baseTaskDirs[0];
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (StorageSlot slot : slots) {
+      if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+        return slot;
+      }
     }
 
-    // if the task directory already exists, we want to give it precedence, so check.
-    for (File baseTaskDir : baseTaskDirs) {
-      if (new File(baseTaskDir, taskId).exists()) {
-        return baseTaskDir;
+    // if it doesn't exist, pick one round-robin and ensure it is unused.
+    for (int i = 0; i < slots.length; ++i) {
+      // This can be negative, so abs() it.
+      final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % slots.length);
+      final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+      if (candidateSlot.runningTaskId != null) {
+        continue;
       }
+      candidateSlot.runningTaskId = taskId;
+      return candidateSlot;
     }
+    throw new ISE("Unable to pick a free slot, this should never happen, slot status [%s].", Arrays.toString(slots));
+  }
 
-    // if it doesn't exist, pick one round-robin and return.  This can be negative, so abs() it
-    final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
-    return baseTaskDirs[currIncrement % baseTaskDirs.length];
+  public synchronized void returnStorageSlot(StorageSlot slot)
+  {
+    slot.runningTaskId = null;
   }
 
   @Nullable
-  public File findExistingTaskDir(String taskId)
+  public synchronized File findExistingTaskDir(String taskId)
   {
+    File candidateLocation = null;
     if (baseTaskDirs.length == 1) {
-      return new File(baseTaskDirs[0], taskId);
+      candidateLocation = new File(baseTaskDirs[0], taskId);
+    } else {
+      for (File baseTaskDir : baseTaskDirs) {
+        File maybeExists = new File(baseTaskDir, taskId);
+        if (maybeExists.exists()) {
+          candidateLocation = maybeExists;
+          break;
+        }
+      }
     }
 
-    for (File baseTaskDir : baseTaskDirs) {
-      final File candidateLocation = new File(baseTaskDir, taskId);
-      if (candidateLocation.exists()) {
-        return candidateLocation;
+    if (candidateLocation != null && candidateLocation.exists()) {
+      // task exists at old location, relocate to a "good" slot location and return that.
+      final StorageSlot taskSlot = pickStorageSlot(taskId);
+      final File pickedLocation = new File(taskSlot.getDirectory(), taskId);
+      if (candidateLocation.renameTo(pickedLocation)) {
+        taskSlot.runningTaskId = taskId;
+        return pickedLocation;
+      } else {
+        throw new ISE("Unable to relocate task ([%s] -> [%s])", candidateLocation, pickedLocation);

Review Comment:
   Good catch, I need tests that cover this migration path and validate that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar merged pull request #14239: Be able to load segments on Peons

Posted by "cheddar (via GitHub)" <gi...@apache.org>.
cheddar merged PR #14239:
URL: https://github.com/apache/druid/pull/14239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192105597


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -477,6 +429,8 @@ public TaskStatus call()
                         portFinder.markPortUnused(tlsChildPort);
                       }
 
+                      getTracker().returnStorageSlot(storageSlot);

Review Comment:
   Hrm, actually, I think I got the order wrong in BaseRestorableTaskRunner.  If it gets not-returned, because the files are still there, then eventually the MM will just fill up with zombie tasks and then be unschedulable, it'll be really hard to figure out that it got in that state.  But if it returns even if it cannot delete, then we will maybe, possibly, get a disk full error telling us that there are disk usage issues rather than zombies and jobs not running.
   
   Fwiw, here in ForkingTaskRunner, I picked the location to be after the ports are returned (which is what is currently used as an indication of whether there are tasks available, and those ports are returned before the task dir is deleted)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1190489402


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    StorageSlot[] slots = new StorageSlot[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      final File slotDirectory = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+      slots[i] = new StorageSlot(slotDirectory, sizePerSlot);
+    }

Review Comment:
   ??  We *do* expect more slots than storage directories.  I don't expect more storage directories than slots, but if that's true, it just means that some of those storage locations won't be used.  That's a great test case to cover.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192097495


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -23,92 +23,240 @@
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
+  private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    final List<String> basePaths = workerConfig.getBaseTaskDirs();
+    final List<File> baseTaskDirs;
+
+    if (basePaths == null) {
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
-      final List<String> basePaths = workerConfig.getBaseTaskDirs();
-      if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
-      }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
+      baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
+    }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+    if (slotsPerBaseTaskDir == 0) {
+      slotsPerBaseTaskDir = 1;
+    } else if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
     }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    File[] slotDirs = new File[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      slotDirs[i] = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long sizePerSlot)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = new StorageSlot[slotDirs.length];
+    for (int i = 0; i < slotDirs.length; ++i) {
+      slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+    }
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",

Review Comment:
   So, if it already existed, the mkdirp would have succeeded regardless of permissions.  The reason for "likely does not exist" is to tell whoever sees this that they shouldn't actually expect it to exist, but they should make it exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192100895


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -24,91 +24,202 @@
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
+    final List<File> baseTaskDirs;
     if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
       final List<String> basePaths = workerConfig.getBaseTaskDirs();
       if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+        baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
+      } else {
+        baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
       }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
     }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = Math.max(1, numSlots / baseTaskDirs.size());
+    if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
+    }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;

Review Comment:
   Yes, that is correct.  I personally struggle with going into the weeds too much in explaining every nuance (and making the docs seem impenetrable) and just saying what it does and letting the nuance "follow" from the explained logic.
   
   If you have a suggestion for wording, I'll probably accept it.  But, I'm prone to just leave the docs as is if it's entirely up to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14239: Be able to load segments on Peons

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14239:
URL: https://github.com/apache/druid/pull/14239#discussion_r1192093915


##########
server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java:
##########
@@ -80,11 +83,35 @@ public int getCapacity()
     return capacity;
   }
 
+  public WorkerConfig setCapacity(int capacity)
+  {
+    this.capacity = capacity;
+    return this;
+  }
+
+  public long getBaseTaskDirSize()
+  {
+    return baseTaskDirSize;
+  }
+
+  public WorkerConfig setBaseTaskDirSize(long baseTaskDirSize)
+  {
+    this.baseTaskDirSize = baseTaskDirSize;
+    return this;
+  }
+
   public List<String> getBaseTaskDirs()
   {
     return baseTaskDirs;
   }
 
+  public WorkerConfig setBaseTaskDirs(List<String> baseTaskDirs)

Review Comment:
   There are a number of different patterns for configs.  For this one, the options were:
   
   1. make the fields public
   2. add setters
   3. Add "with" methods that build new objects
   
   I didn't want to do (1) and took the lazy route of (2).  It probably is better to do the less-lazy route of (3) though, just to protect against some production code actually mutating the object at some point and causing unexpected side-effects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org