You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/04/11 08:53:12 UTC

[GitHub] [druid] imply-cheddar opened a new pull request, #14063: Make the tasks run with only a single directory

imply-cheddar opened a new pull request, #14063:
URL: https://github.com/apache/druid/pull/14063

   There was a change that tried to get indexing to run on multiple disks It made a bunch of changes to how tasks run, effectively hiding the "safe" directory for tasks to write files into from the task code itself making it extremely difficult to do anything correctly inside of a task.
   
   This change reverts those changes inside of the tasks and makes it so that only the task runners are the ones that make decisions about which mount points should be used for storing task-related files.
   
   It adds the config druid.worker.baseTaskDirs which can be used by the task runners to know which directories they should schedule tasks inside of. The TaskConfig remains the authoritative source of configuration for where and how an individual task should be operating.
   
   #### Release note
   
   Tasks can now run using multiple different mount points for temporary storage.  Set `druid.worker.baseTaskDirs` to an array of locations to enable!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163272116


##########
services/src/main/java/org/apache/druid/cli/CliPeon.java:
##########
@@ -144,10 +143,10 @@ public class CliPeon extends GuiceRunnable
 {
   @SuppressWarnings("WeakerAccess")
   @Required
-  @Arguments(description = "baseTaskDirPath taskId attemptId")
+  @Arguments(description = "taskDirPath attemptId")
   public List<String> taskAndStatusFile;
 
-  // path to the base task Directory
+  // path to the task Directory
   private String taskDirPath;

Review Comment:
   Yeah, you caught a bug where I wasn't properly overriding this when forking processes.  That's fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1164687574


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -19,81 +19,94 @@
 
 package org.apache.druid.indexing.common;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
-import javax.inject.Inject;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class TaskStorageDirTracker
 {
-  private int taskDirIndex = 0;
-
-  private final List<File> baseTaskDirs = new ArrayList<>();
-
-  private final Map<String, File> taskToTempDirMap = new HashMap<>();
-
-  @Inject
-  public TaskStorageDirTracker(final TaskConfig taskConfig)
-  {
-    this(taskConfig.getBaseTaskDirPaths());
-  }
-
-  @VisibleForTesting
-  public TaskStorageDirTracker(final List<String> baseTaskDirPaths)
+  public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    for (String baseTaskDirPath : baseTaskDirPaths) {
-      baseTaskDirs.add(new File(baseTaskDirPath));
+    if (workerConfig == null) {
+      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    } else {
+      final List<String> basePaths = workerConfig.getBaseTaskDirs();
+      if (basePaths == null) {
+        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      }
+      return new TaskStorageDirTracker(
+          basePaths.stream().map(File::new).collect(Collectors.toList())
+      );
     }
   }
 
-  public File getTaskDir(String taskId)
-  {
-    return new File(getBaseTaskDir(taskId), taskId);
-  }
+  private final File[] baseTaskDirs;
+  private final AtomicInteger iterationCounter = new AtomicInteger(0);
 
-  public File getTaskWorkDir(String taskId)
+  public TaskStorageDirTracker(List<File> baseTaskDirs)
   {
-    return new File(getTaskDir(taskId), "work");
+    this.baseTaskDirs = baseTaskDirs.toArray(new File[]{});
   }
 
-  public File getTaskTempDir(String taskId)
+  @LifecycleStart
+  public void ensureDirectories()
   {
-    return new File(getTaskDir(taskId), "temp");
+    for (File baseTaskDir : baseTaskDirs) {
+      if (!baseTaskDir.exists()) {
+        try {
+          FileUtils.mkdirp(baseTaskDir);
+        }
+        catch (IOException e) {
+          throw new ISE(
+              e,
+              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              baseTaskDir
+          );
+        }
+      }
+    }
   }
 
-  public List<File> getBaseTaskDirs()
+  public File pickBaseDir(String taskId) throws IOException
   {
-    return baseTaskDirs;
-  }
+    if (baseTaskDirs.length == 1) {
+      return baseTaskDirs[0];
+    }
 
-  public synchronized File getBaseTaskDir(final String taskId)
-  {
-    if (!taskToTempDirMap.containsKey(taskId)) {
-      addTask(taskId, baseTaskDirs.get(taskDirIndex));
-      taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (File baseTaskDir : baseTaskDirs) {
+      if (new File(baseTaskDir, taskId).exists()) {
+        return baseTaskDir;
+      }
     }
 
-    return taskToTempDirMap.get(taskId);
+    // if it doesn't exist, pick one round-robin and return.  This will roll negative, but that's okay because we
+    // are always modding it.
+    final int currIncrement = iterationCounter.getAndIncrement() % baseTaskDirs.length;
+    return baseTaskDirs[currIncrement % baseTaskDirs.length];

Review Comment:
   Good catch, I adjusted it to initialize to a negative number so that the tests would also cover handling negative numbers and added an absolute value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar merged pull request #14063: Make the tasks run with only a single directory

Posted by "cheddar (via GitHub)" <gi...@apache.org>.
cheddar merged PR #14063:
URL: https://github.com/apache/druid/pull/14063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1162547304


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -202,17 +190,30 @@
     return baseDir;
   }
 
-  @Deprecated
-  @JsonProperty("baseTaskDir")
-  public String getBaseTaskDirPath()
+  @JsonProperty
+  public File getBaseTaskDir()
   {
-    return baseTaskDirPath;
+    return baseTaskDir;
   }
 
-  @JsonProperty
-  public List<String> getBaseTaskDirPaths()
+  public File getTaskDir(String taskId)
   {
-    return baseTaskDirPaths;
+    return new File(baseTaskDir, IdUtils.validateId("task ID", taskId));

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/31)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163484206


##########
docs/configuration/index.md:
##########
@@ -1457,6 +1457,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
 |`druid.worker.ip`|The IP of the worker.|localhost|
 |`druid.worker.version`|Version identifier for the MiddleManager. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.indexer.runner.minWorkerVersion`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons.|0|
 |`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of CPUs on the machine - 1|
+|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used.  Example: `druid.worker.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|

Review Comment:
   typo: baseTaskDirPaths -> baseTaskDirs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163251651


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -118,18 +121,10 @@
   @JsonProperty
   private final boolean encapsulatedTask;
 
-  @Deprecated
-  @JsonProperty("baseTaskDir")
-  private final String baseTaskDirPath;
-
-  // Use multiple base files for tasks instead of a single one
-  @JsonProperty
-  private final List<String> baseTaskDirPaths;
-
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
-      @Deprecated @JsonProperty("baseTaskDir") String baseTaskDirPath,
+      @JsonProperty("baseTaskDir") String baseTaskDir,

Review Comment:
   No, it should be overridden on the TaskConfig object that is sent to the tasks.  Let me make sure that I'm doing that properly, I might've overlooked it as I got bogged down in the degree of sprawl that this PR has.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1164721389


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -19,81 +19,96 @@
 
 package org.apache.druid.indexing.common;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
-import javax.inject.Inject;
+import javax.annotation.Nullable;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class TaskStorageDirTracker
 {
-  private int taskDirIndex = 0;
-
-  private final List<File> baseTaskDirs = new ArrayList<>();
-
-  private final Map<String, File> taskToTempDirMap = new HashMap<>();
-
-  @Inject
-  public TaskStorageDirTracker(final TaskConfig taskConfig)
-  {
-    this(taskConfig.getBaseTaskDirPaths());
-  }
-
-  @VisibleForTesting
-  public TaskStorageDirTracker(final List<String> baseTaskDirPaths)
+  public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    for (String baseTaskDirPath : baseTaskDirPaths) {
-      baseTaskDirs.add(new File(baseTaskDirPath));
+    if (workerConfig == null) {
+      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    } else {
+      final List<String> basePaths = workerConfig.getBaseTaskDirs();
+      if (basePaths == null) {
+        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      }
+      return new TaskStorageDirTracker(
+          basePaths.stream().map(File::new).collect(Collectors.toList())
+      );
     }
   }
 
-  public File getTaskDir(String taskId)
-  {
-    return new File(getBaseTaskDir(taskId), taskId);
-  }
+  private final File[] baseTaskDirs;
+  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+  private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public File getTaskWorkDir(String taskId)
+  public TaskStorageDirTracker(List<File> baseTaskDirs)
   {
-    return new File(getTaskDir(taskId), "work");
+    this.baseTaskDirs = baseTaskDirs.toArray(new File[]{});
   }
 
-  public File getTaskTempDir(String taskId)
+  @LifecycleStart
+  public void ensureDirectories()
   {
-    return new File(getTaskDir(taskId), "temp");
+    for (File baseTaskDir : baseTaskDirs) {
+      if (!baseTaskDir.exists()) {
+        try {
+          FileUtils.mkdirp(baseTaskDir);
+        }
+        catch (IOException e) {
+          throw new ISE(
+              e,
+              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              baseTaskDir
+          );
+        }
+      }
+    }
   }
 
-  public List<File> getBaseTaskDirs()
+  public File pickBaseDir(String taskId) throws IOException
   {
-    return baseTaskDirs;
-  }
+    if (baseTaskDirs.length == 1) {
+      return baseTaskDirs[0];
+    }
 
-  public synchronized File getBaseTaskDir(final String taskId)
-  {
-    if (!taskToTempDirMap.containsKey(taskId)) {
-      addTask(taskId, baseTaskDirs.get(taskDirIndex));
-      taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (File baseTaskDir : baseTaskDirs) {
+      if (new File(baseTaskDir, taskId).exists()) {

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4790)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163271697


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -118,18 +121,10 @@
   @JsonProperty
   private final boolean encapsulatedTask;
 
-  @Deprecated
-  @JsonProperty("baseTaskDir")
-  private final String baseTaskDirPath;
-
-  // Use multiple base files for tasks instead of a single one
-  @JsonProperty
-  private final List<String> baseTaskDirPaths;
-
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
-      @Deprecated @JsonProperty("baseTaskDir") String baseTaskDirPath,
+      @JsonProperty("baseTaskDir") String baseTaskDir,

Review Comment:
   Okay, just fixed this by setting the config from the ForkingTaskRunner and using a different build method from the ThreadingTaskRunner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163484441


##########
docs/configuration/index.md:
##########
@@ -1577,11 +1577,11 @@ then the value from the configuration below is used:
 |--------|-----------|-------|
 |`druid.worker.version`|Version identifier for the Indexer.|0|
 |`druid.worker.capacity`|Maximum number of tasks the Indexer can accept.|Number of available processors - 1|
+|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used.  Example: `druid.worker.baseTaskDirPaths=[\"PATH1\",\"PATH2\",...]`.|null|

Review Comment:
   typo: baseTaskDirPaths -> baseTaskDirs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1164072588


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -19,81 +19,94 @@
 
 package org.apache.druid.indexing.common;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 
-import javax.inject.Inject;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class TaskStorageDirTracker
 {
-  private int taskDirIndex = 0;
-
-  private final List<File> baseTaskDirs = new ArrayList<>();
-
-  private final Map<String, File> taskToTempDirMap = new HashMap<>();
-
-  @Inject
-  public TaskStorageDirTracker(final TaskConfig taskConfig)
-  {
-    this(taskConfig.getBaseTaskDirPaths());
-  }
-
-  @VisibleForTesting
-  public TaskStorageDirTracker(final List<String> baseTaskDirPaths)
+  public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    for (String baseTaskDirPath : baseTaskDirPaths) {
-      baseTaskDirs.add(new File(baseTaskDirPath));
+    if (workerConfig == null) {
+      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    } else {
+      final List<String> basePaths = workerConfig.getBaseTaskDirs();
+      if (basePaths == null) {
+        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      }
+      return new TaskStorageDirTracker(
+          basePaths.stream().map(File::new).collect(Collectors.toList())
+      );
     }
   }
 
-  public File getTaskDir(String taskId)
-  {
-    return new File(getBaseTaskDir(taskId), taskId);
-  }
+  private final File[] baseTaskDirs;
+  private final AtomicInteger iterationCounter = new AtomicInteger(0);
 
-  public File getTaskWorkDir(String taskId)
+  public TaskStorageDirTracker(List<File> baseTaskDirs)
   {
-    return new File(getTaskDir(taskId), "work");
+    this.baseTaskDirs = baseTaskDirs.toArray(new File[]{});
   }
 
-  public File getTaskTempDir(String taskId)
+  @LifecycleStart
+  public void ensureDirectories()
   {
-    return new File(getTaskDir(taskId), "temp");
+    for (File baseTaskDir : baseTaskDirs) {
+      if (!baseTaskDir.exists()) {
+        try {
+          FileUtils.mkdirp(baseTaskDir);
+        }
+        catch (IOException e) {
+          throw new ISE(
+              e,
+              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              baseTaskDir
+          );
+        }
+      }
+    }
   }
 
-  public List<File> getBaseTaskDirs()
+  public File pickBaseDir(String taskId) throws IOException
   {
-    return baseTaskDirs;
-  }
+    if (baseTaskDirs.length == 1) {
+      return baseTaskDirs[0];
+    }
 
-  public synchronized File getBaseTaskDir(final String taskId)
-  {
-    if (!taskToTempDirMap.containsKey(taskId)) {
-      addTask(taskId, baseTaskDirs.get(taskDirIndex));
-      taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (File baseTaskDir : baseTaskDirs) {
+      if (new File(baseTaskDir, taskId).exists()) {
+        return baseTaskDir;
+      }
     }
 
-    return taskToTempDirMap.get(taskId);
+    // if it doesn't exist, pick one round-robin and return.  This will roll negative, but that's okay because we
+    // are always modding it.
+    final int currIncrement = iterationCounter.getAndIncrement() % baseTaskDirs.length;
+    return baseTaskDirs[currIncrement % baseTaskDirs.length];

Review Comment:
   negative number modulo positive number is negative in java.
   Even though overflow is unlikely to happen, it might be nice to add a check here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1163643277


##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -284,8 +286,8 @@
 
       try {
         FileUtils.writeAtomically(
-            getAssignedTaskFile(task.getId()),
-            getTmpTaskDir(task.getId()),
+            new File(getAssignedTaskDir(), task.getId()),

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/36)



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -401,7 +381,7 @@
   private void cleanupAssignedTask(Task task)
   {
     assignedTasks.remove(task.getId());
-    File taskFile = getAssignedTaskFile(task.getId());
+    File taskFile = new File(getAssignedTaskDir(), task.getId());

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/37)



##########
indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java:
##########
@@ -478,7 +450,7 @@
 
       try {
         FileUtils.writeAtomically(
-            getCompletedTaskFile(taskId), getTmpTaskDir(taskId),
+            new File(getCompletedTaskDir(), taskId), getTmpTaskDir(),

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/38)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java:
##########
@@ -155,8 +156,21 @@
                         @Override
                         public TaskStatus call()
                         {
+                          final File baseDirForTask;
+                          try {
+                            baseDirForTask = getTracker().pickBaseDir(task.getId());
+                          }
+                          catch (IOException e) {
+                            LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
+                            return TaskStatus.failure(
+                                task.getId(),
+                                StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
+                            );
+
+                          }
+                          final File taskDir = new File(baseDirForTask, task.getId());

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4780)



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java:
##########
@@ -19,81 +19,67 @@
 
 package org.apache.druid.indexing.common;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 
-import javax.inject.Inject;
 import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 public class TaskStorageDirTracker
 {
-  private int taskDirIndex = 0;
-
-  private final List<File> baseTaskDirs = new ArrayList<>();
-
-  private final Map<String, File> taskToTempDirMap = new HashMap<>();
-
-  @Inject
-  public TaskStorageDirTracker(final TaskConfig taskConfig)
-  {
-    this(taskConfig.getBaseTaskDirPaths());
-  }
-
-  @VisibleForTesting
-  public TaskStorageDirTracker(final List<String> baseTaskDirPaths)
+  public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    for (String baseTaskDirPath : baseTaskDirPaths) {
-      baseTaskDirs.add(new File(baseTaskDirPath));
+    if (workerConfig == null) {
+      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    } else {
+      final List<String> basePaths = workerConfig.getBaseTaskDirs();
+      if (basePaths == null) {
+        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+      }
+      return new TaskStorageDirTracker(
+          basePaths.stream().map(File::new).collect(Collectors.toList())
+      );
     }
   }
 
-  public File getTaskDir(String taskId)
-  {
-    return new File(getBaseTaskDir(taskId), taskId);
-  }
+  private final List<File> baseTaskDirs;
 
-  public File getTaskWorkDir(String taskId)
+  public TaskStorageDirTracker(List<File> baseTaskDirs)
   {
-    return new File(getTaskDir(taskId), "work");
+    this.baseTaskDirs = baseTaskDirs;
   }
 
-  public File getTaskTempDir(String taskId)
+  public File pickBaseDir(String taskId) throws IOException
   {
-    return new File(getTaskDir(taskId), "temp");
-  }
-
-  public List<File> getBaseTaskDirs()
-  {
-    return baseTaskDirs;
-  }
-
-  public synchronized File getBaseTaskDir(final String taskId)
-  {
-    if (!taskToTempDirMap.containsKey(taskId)) {
-      addTask(taskId, baseTaskDirs.get(taskDirIndex));
-      taskDirIndex = (taskDirIndex + 1) % baseTaskDirs.size();
+    File leastUsed = null;
+    long numEntries = Long.MAX_VALUE;
+
+    for (File baseTaskDir : baseTaskDirs) {
+      if (new File(baseTaskDir, taskId).exists()) {

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4778)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java:
##########
@@ -155,9 +155,20 @@
                 public TaskStatus call()
                 {
 
-                  final String attemptId = String.valueOf(getNextAttemptID(dirTracker, task.getId()));
-                  final String baseTaskDir = dirTracker.getBaseTaskDir(task.getId()).getAbsolutePath();
-                  final File taskDir = dirTracker.getTaskDir(task.getId());
+                  final File baseDirForTask;
+                  try {
+                    baseDirForTask = getTracker().pickBaseDir(task.getId());
+                  }
+                  catch (IOException e) {
+                    LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
+                    return TaskStatus.failure(
+                        task.getId(),
+                        StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
+                    );
+                  }
+
+                  final File taskDir = new File(baseDirForTask, task.getId());

Review Comment:
   ## Uncontrolled data used in path expression
   
   This path depends on a [user-provided value](1).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4779)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] rohangarg commented on a diff in pull request #14063: Make the tasks run with only a single directory

Posted by "rohangarg (via GitHub)" <gi...@apache.org>.
rohangarg commented on code in PR #14063:
URL: https://github.com/apache/druid/pull/14063#discussion_r1162824021


##########
services/src/main/java/org/apache/druid/cli/CliPeon.java:
##########
@@ -144,10 +143,10 @@ public class CliPeon extends GuiceRunnable
 {
   @SuppressWarnings("WeakerAccess")
   @Required
-  @Arguments(description = "baseTaskDirPath taskId attemptId")
+  @Arguments(description = "taskDirPath attemptId")
   public List<String> taskAndStatusFile;
 
-  // path to the base task Directory
+  // path to the task Directory
   private String taskDirPath;

Review Comment:
   I have some difficulty understanding the connection of this path with the intermediate directories being created by the task. This path would have been derived from the multiple directory selection algorithm present in the task runners. Although, `CliPeon` also injects `TaskConfig` object as a config object which means that config object wouldn't have the directory selection logic present. 
   Is there some place which modifies the `TaskConfig` object to override its `baseTaskDirPath` with the path given in the task launcher command line?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java:
##########
@@ -118,18 +121,10 @@
   @JsonProperty
   private final boolean encapsulatedTask;
 
-  @Deprecated
-  @JsonProperty("baseTaskDir")
-  private final String baseTaskDirPath;
-
-  // Use multiple base files for tasks instead of a single one
-  @JsonProperty
-  private final List<String> baseTaskDirPaths;
-
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
-      @Deprecated @JsonProperty("baseTaskDir") String baseTaskDirPath,
+      @JsonProperty("baseTaskDir") String baseTaskDir,

Review Comment:
   should this still be deprecated as a json config for configuring task locations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar commented on pull request #14063: Make the tasks run with only a single directory

Posted by "cheddar (via GitHub)" <gi...@apache.org>.
cheddar commented on PR #14063:
URL: https://github.com/apache/druid/pull/14063#issuecomment-1506502602

   The CodeQL things are highlighting points where we take locations from configs.  Those configs require access to edit files and such, which already means that the system is compromised, so safe to ignore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org