You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/02/15 17:45:52 UTC

[druid] branch master updated: Add config to limit task slots for parallel indexing tasks (#12221)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 393e9b6  Add config to limit task slots for parallel indexing tasks (#12221)
393e9b6 is described below

commit 393e9b68a8875e07942528720a533bc26580abb8
Author: AmatyaAvadhanula <95...@users.noreply.github.com>
AuthorDate: Tue Feb 15 23:15:09 2022 +0530

    Add config to limit task slots for parallel indexing tasks (#12221)
    
    In extreme cases where many parallel indexing jobs are submitted together, it is possible
    that the `ParallelIndexSupervisorTasks` take up all slots leaving no slot to schedule
    their own sub-tasks thus stalling progress of all the indexing jobs.
    
    Key changes:
    - Add config `druid.indexer.runner.parallelIndexTaskSlotRatio` to limit the task slots
      for `ParallelIndexSupervisorTasks` per worker
    - `ratio = 1` implies supervisor tasks can use all slots on a worker if needed (default behavior)
    - `ratio = 0` implies supervisor tasks can not use any slot on a worker
       (actually, at least 1 slot is always available to ensure progress of parallel indexing jobs)
    - `ImmutableWorkerInfo.canRunTask()`
    - `WorkerHolder`, `ZkWorker`, `WorkerSelectUtils`
---
 docs/configuration/index.md                        |  1 +
 .../indexing/overlord/ImmutableWorkerInfo.java     | 56 +++++++++++++++++++-
 .../apache/druid/indexing/overlord/ZkWorker.java   | 14 +++++
 ...PendingTaskBasedWorkerProvisioningStrategy.java |  5 ++
 .../overlord/config/WorkerTaskRunnerConfig.java    | 18 +++++++
 .../druid/indexing/overlord/hrtr/WorkerHolder.java | 13 +++++
 .../indexing/overlord/setup/WorkerSelectUtils.java |  2 +-
 .../indexing/overlord/ImmutableWorkerInfoTest.java | 60 ++++++++++++++++++++++
 .../setup/JavaScriptWorkerSelectStrategyTest.java  |  6 ++-
 9 files changed, 170 insertions(+), 5 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9c37d4a..cf9f2f0 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1066,6 +1066,7 @@ The following configs only apply if the Overlord is running in remote mode. For
 |--------|-----------|-------|
 |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a MiddleManager before throwing an error.|PT5M|
 |`druid.indexer.runner.minWorkerVersion`|The minimum MiddleManager version to send tasks to. |"0"|
+| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range [0, 1]. |1|
 |`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect MiddleManagers to compress Znodes.|true|
 |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
 |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a MiddleManager is disconnected from Zookeeper.|PT15M|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index 2c07758..aaea3f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.worker.Worker;
 import org.joda.time.DateTime;
 
@@ -39,6 +40,7 @@ public class ImmutableWorkerInfo
 {
   private final Worker worker;
   private final int currCapacityUsed;
+  private final int currParallelIndexCapacityUsed;
   private final ImmutableSet<String> availabilityGroups;
   private final ImmutableSet<String> runningTasks;
   private final DateTime lastCompletedTaskTime;
@@ -48,6 +50,7 @@ public class ImmutableWorkerInfo
   public ImmutableWorkerInfo(
       @JsonProperty("worker") Worker worker,
       @JsonProperty("currCapacityUsed") int currCapacityUsed,
+      @JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
       @JsonProperty("availabilityGroups") Set<String> availabilityGroups,
       @JsonProperty("runningTasks") Collection<String> runningTasks,
       @JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@@ -56,6 +59,7 @@ public class ImmutableWorkerInfo
   {
     this.worker = worker;
     this.currCapacityUsed = currCapacityUsed;
+    this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
     this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
     this.runningTasks = ImmutableSet.copyOf(runningTasks);
     this.lastCompletedTaskTime = lastCompletedTaskTime;
@@ -65,12 +69,25 @@ public class ImmutableWorkerInfo
   public ImmutableWorkerInfo(
       Worker worker,
       int currCapacityUsed,
+      int currParallelIndexCapacityUsed,
       Set<String> availabilityGroups,
       Collection<String> runningTasks,
       DateTime lastCompletedTaskTime
   )
   {
-    this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
+    this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
+         runningTasks, lastCompletedTaskTime, null);
+  }
+
+  public ImmutableWorkerInfo(
+      Worker worker,
+      int currCapacityUsed,
+      Set<String> availabilityGroups,
+      Collection<String> runningTasks,
+      DateTime lastCompletedTaskTime
+  )
+  {
+    this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
   }
 
   @JsonProperty("worker")
@@ -85,6 +102,12 @@ public class ImmutableWorkerInfo
     return currCapacityUsed;
   }
 
+  @JsonProperty("currParallelIndexCapacityUsed")
+  public int getCurrParallelIndexCapacityUsed()
+  {
+    return currParallelIndexCapacityUsed;
+  }
+
   @JsonProperty("availabilityGroups")
   public Set<String> getAvailabilityGroups()
   {
@@ -119,12 +142,36 @@ public class ImmutableWorkerInfo
     return worker.getVersion().compareTo(minVersion) >= 0;
   }
 
-  public boolean canRunTask(Task task)
+  public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
   {
     return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
+            && canRunParallelIndexTask(task, parallelIndexTaskSlotRatio)
             && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
   }
 
+  private boolean canRunParallelIndexTask(Task task, double parallelIndexTaskSlotRatio)
+  {
+    if (!task.getType().equals(ParallelIndexSupervisorTask.TYPE)) {
+      return true;
+    }
+    return getWorkerParallelIndexCapacity(parallelIndexTaskSlotRatio) - getCurrParallelIndexCapacityUsed()
+           >= task.getTaskResource().getRequiredCapacity();
+
+  }
+
+  private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio)
+  {
+    int totalCapacity = worker.getCapacity();
+    int workerParallelIndexCapacity = (int) Math.floor(parallelIndexTaskSlotRatio * totalCapacity);
+    if (workerParallelIndexCapacity < 1) {
+      workerParallelIndexCapacity = 1;
+    }
+    if (workerParallelIndexCapacity > totalCapacity) {
+      workerParallelIndexCapacity = totalCapacity;
+    }
+    return workerParallelIndexCapacity;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -140,6 +187,9 @@ public class ImmutableWorkerInfo
     if (currCapacityUsed != that.currCapacityUsed) {
       return false;
     }
+    if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
+      return false;
+    }
     if (!worker.equals(that.worker)) {
       return false;
     }
@@ -162,6 +212,7 @@ public class ImmutableWorkerInfo
   {
     int result = worker.hashCode();
     result = 31 * result + currCapacityUsed;
+    result = 31 * result + currParallelIndexCapacityUsed;
     result = 31 * result + availabilityGroups.hashCode();
     result = 31 * result + runningTasks.hashCode();
     result = 31 * result + lastCompletedTaskTime.hashCode();
@@ -175,6 +226,7 @@ public class ImmutableWorkerInfo
     return "ImmutableWorkerInfo{" +
            "worker=" + worker +
            ", currCapacityUsed=" + currCapacityUsed +
+           ", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
            ", availabilityGroups=" + availabilityGroups +
            ", runningTasks=" + runningTasks +
            ", lastCompletedTaskTime=" + lastCompletedTaskTime +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
index 2be16ae..a875090 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.druid.annotations.UsedInGeneratedCode;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.DateTimes;
@@ -110,6 +111,18 @@ public class ZkWorker implements Closeable
     return currCapacity;
   }
 
+  @JsonProperty("currParallelIndexCapacityUsed")
+  public int getCurrParallelIndexCapacityUsed()
+  {
+    int currParallelIndexCapacityUsed = 0;
+    for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+      if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
+        currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
+      }
+    }
+    return currParallelIndexCapacityUsed;
+  }
+
   @JsonProperty("availabilityGroups")
   public Set<String> getAvailabilityGroups()
   {
@@ -168,6 +181,7 @@ public class ZkWorker implements Closeable
     return new ImmutableWorkerInfo(
         worker.get(),
         getCurrCapacityUsed(),
+        getCurrParallelIndexCapacityUsed(),
         getAvailabilityGroups(),
         getRunningTaskIds(),
         lastCompletedTaskTime.get(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 7d83b3d..1c9ba5f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -477,9 +478,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
 
   private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task)
   {
+    int parallelIndexTaskCapacity = task.getType().equals(ParallelIndexSupervisorTask.TYPE)
+                                    ? task.getTaskResource().getRequiredCapacity()
+                                    : 0;
     return new ImmutableWorkerInfo(
         immutableWorker.getWorker(),
         immutableWorker.getCurrCapacityUsed() + 1,
+        immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity,
         Sets.union(
             immutableWorker.getAvailabilityGroups(),
             Sets.newHashSet(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
index c7f1345..d916b3e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
@@ -26,8 +26,26 @@ public class WorkerTaskRunnerConfig
   @JsonProperty
   private String minWorkerVersion = "0";
 
+  @JsonProperty
+  private double parallelIndexTaskSlotRatio = 1;
+
   public String getMinWorkerVersion()
   {
     return minWorkerVersion;
   }
+
+  /**
+   * The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier
+   *
+   * A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour)
+   * A value of 0 means ParallelIndexSupervisorTasks can occupy no slots.
+   * Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks,
+   * as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks.
+   *
+   * @return ratio of task slots available to a parallel indexing task at a worker level
+   */
+  public double getParallelIndexTaskSlotRatio()
+  {
+    return parallelIndexTaskSlotRatio;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index ff9aab1..0bf4de0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.TaskRunnerUtils;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@@ -149,6 +150,17 @@ public class WorkerHolder
     return currCapacity;
   }
 
+  private int getCurrParallelIndexCapcityUsed()
+  {
+    int currParallelIndexCapacityUsed = 0;
+    for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+      if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
+        currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
+      }
+    }
+    return currParallelIndexCapacityUsed;
+  }
+
   private Set<String> getAvailabilityGroups()
   {
     Set<String> retVal = new HashSet<>();
@@ -193,6 +205,7 @@ public class WorkerHolder
     return new ImmutableWorkerInfo(
         w,
         getCurrCapacityUsed(),
+        getCurrParallelIndexCapcityUsed(),
         getAvailabilityGroups(),
         getRunningTasks().keySet(),
         lastCompletedTaskTime.get(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index 24721e8..c3832da 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -150,7 +150,7 @@ public class WorkerSelectUtils
   {
     return allWorkers.values()
                      .stream()
-                     .filter(worker -> worker.canRunTask(task)
+                     .filter(worker -> worker.canRunTask(task, workerTaskRunnerConfig.getParallelIndexTaskSlotRatio())
                                        && worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
                      .collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
index 785698e..e373eb9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
@@ -22,6 +22,10 @@ package org.apache.druid.indexing.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -29,6 +33,9 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class ImmutableWorkerInfoTest
 {
   @Test
@@ -193,6 +200,7 @@ public class ImmutableWorkerInfoTest
             "http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
         ),
         3,
+        0,
         ImmutableSet.of("grp1", "grp2"),
         ImmutableSet.of("task1", "task2"),
         DateTimes.of("2015-01-01T01:01:01Z"),
@@ -202,6 +210,7 @@ public class ImmutableWorkerInfoTest
             "http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
         ),
         2,
+        0,
         ImmutableSet.of("grp1", "grp2"),
         ImmutableSet.of("task1", "task2"),
         DateTimes.of("2015-01-01T01:01:02Z"),
@@ -209,6 +218,57 @@ public class ImmutableWorkerInfoTest
     ), false);
   }
 
+  @Test
+  public void test_canRunTask()
+  {
+    ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
+        new Worker("http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY),
+        6,
+        0,
+        ImmutableSet.of("grp1", "grp2"),
+        ImmutableSet.of("task1", "task2"),
+        DateTimes.of("2015-01-01T01:01:02Z")
+    );
+
+
+    // Parallel index task
+    TaskResource taskResource0 = mock(TaskResource.class);
+    when(taskResource0.getRequiredCapacity()).thenReturn(3);
+    Task parallelIndexTask = mock(ParallelIndexSupervisorTask.class);
+    when(parallelIndexTask.getType()).thenReturn(ParallelIndexSupervisorTask.TYPE);
+    when(parallelIndexTask.getTaskResource()).thenReturn(taskResource0);
+
+    // Since task satisifies parallel and total slot constraints, can run
+    Assert.assertTrue(workerInfo.canRunTask(parallelIndexTask, 0.5));
+
+    // Since task fails the parallel slot constraint, it cannot run (3 > 1)
+    Assert.assertFalse(workerInfo.canRunTask(parallelIndexTask, 0.1));
+
+
+    // Some other indexing task
+    TaskResource taskResource1 = mock(TaskResource.class);
+    when(taskResource1.getRequiredCapacity()).thenReturn(5);
+    Task anyOtherTask = mock(IndexTask.class);
+    when(anyOtherTask.getType()).thenReturn("index");
+    when(anyOtherTask.getTaskResource()).thenReturn(taskResource1);
+
+    // Not a parallel index task ->  satisfies parallel index constraint
+    // But does not satisfy the total slot constraint and cannot run (11 > 10)
+    Assert.assertFalse(workerInfo.canRunTask(anyOtherTask, 0.5));
+
+
+    // Task has an availability conflict ("grp1")
+    TaskResource taskResource2 = mock(TaskResource.class);
+    when(taskResource2.getRequiredCapacity()).thenReturn(1);
+    when(taskResource2.getAvailabilityGroup()).thenReturn("grp1");
+    Task grp1Task = mock(IndexTask.class);
+    when(grp1Task.getType()).thenReturn("blah");
+    when(grp1Task.getTaskResource()).thenReturn(taskResource2);
+
+    // Satisifies parallel index and total index slot constraints but cannot run due availability
+    Assert.assertFalse(workerInfo.canRunTask(grp1Task, 0.3));
+  }
+
   private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)
   {
     if (shouldMatch) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
index 8717edc..c767bf6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
@@ -57,10 +57,11 @@ public class JavaScriptWorkerSelectStrategyTest
       + "}\n"
       + "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n"
       + "var minWorkerVer = config.getMinWorkerVersion();\n"
+      + "var parallelIndexTaskSlotRatio = config.getParallelIndexTaskSlotRatio();\n"
       + "for (var i = 0; i < sortedWorkers.length; i++) {\n"
       + " var worker = sortedWorkers[i];\n"
       + "  var zkWorker = zkWorkers.get(worker);\n"
-      + "  if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {\n"
+      + "  if (zkWorker.canRunTask(task, parallelIndexTaskSlotRatio) && zkWorker.isValidVersion(minWorkerVer)) {\n"
       + "    if (task.getType() == 'index_hadoop' && batch_workers.contains(worker)) {\n"
       + "      return worker;\n"
       + "    } else {\n"
@@ -238,8 +239,9 @@ public class JavaScriptWorkerSelectStrategyTest
   private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
   {
     ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class);
-    EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
+    EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class), EasyMock.anyDouble())).andReturn(canRunTask).anyTimes();
     EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
+    EasyMock.expect(worker.getCurrParallelIndexCapacityUsed()).andReturn(0).anyTimes();
     EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
     EasyMock.replay(worker);
     return worker;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org