You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/02/15 17:45:52 UTC
[druid] branch master updated: Add config to limit task slots for parallel indexing tasks (#12221)
This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 393e9b6 Add config to limit task slots for parallel indexing tasks (#12221)
393e9b6 is described below
commit 393e9b68a8875e07942528720a533bc26580abb8
Author: AmatyaAvadhanula <95...@users.noreply.github.com>
AuthorDate: Tue Feb 15 23:15:09 2022 +0530
Add config to limit task slots for parallel indexing tasks (#12221)
In extreme cases where many parallel indexing jobs are submitted together, it is possible
that the `ParallelIndexSupervisorTasks` take up all slots leaving no slot to schedule
their own sub-tasks thus stalling progress of all the indexing jobs.
Key changes:
- Add config `druid.indexer.runner.parallelIndexTaskSlotRatio` to limit the task slots
for `ParallelIndexSupervisorTasks` per worker
- `ratio = 1` implies supervisor tasks can use all slots on a worker if needed (default behavior)
- `ratio = 0` implies supervisor tasks can not use any slot on a worker
(actually, at least 1 slot is always available to ensure progress of parallel indexing jobs)
- `ImmutableWorkerInfo.canRunTask()`
- `WorkerHolder`, `ZkWorker`, `WorkerSelectUtils`
---
docs/configuration/index.md | 1 +
.../indexing/overlord/ImmutableWorkerInfo.java | 56 +++++++++++++++++++-
.../apache/druid/indexing/overlord/ZkWorker.java | 14 +++++
...PendingTaskBasedWorkerProvisioningStrategy.java | 5 ++
.../overlord/config/WorkerTaskRunnerConfig.java | 18 +++++++
.../druid/indexing/overlord/hrtr/WorkerHolder.java | 13 +++++
.../indexing/overlord/setup/WorkerSelectUtils.java | 2 +-
.../indexing/overlord/ImmutableWorkerInfoTest.java | 60 ++++++++++++++++++++++
.../setup/JavaScriptWorkerSelectStrategyTest.java | 6 ++-
9 files changed, 170 insertions(+), 5 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9c37d4a..cf9f2f0 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1066,6 +1066,7 @@ The following configs only apply if the Overlord is running in remote mode. For
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a MiddleManager before throwing an error.|PT5M|
|`druid.indexer.runner.minWorkerVersion`|The minimum MiddleManager version to send tasks to. |"0"|
+| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range [0, 1]. |1|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect MiddleManagers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper, should be in the range of [10KiB, 2GiB). [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a MiddleManager is disconnected from Zookeeper.|PT15M|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index 2c07758..aaea3f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
@@ -39,6 +40,7 @@ public class ImmutableWorkerInfo
{
private final Worker worker;
private final int currCapacityUsed;
+ private final int currParallelIndexCapacityUsed;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
@@ -48,6 +50,7 @@ public class ImmutableWorkerInfo
public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
+ @JsonProperty("currParallelIndexCapacityUsed") int currParallelIndexCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@@ -56,6 +59,7 @@ public class ImmutableWorkerInfo
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
+ this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
@@ -65,12 +69,25 @@ public class ImmutableWorkerInfo
public ImmutableWorkerInfo(
Worker worker,
int currCapacityUsed,
+ int currParallelIndexCapacityUsed,
Set<String> availabilityGroups,
Collection<String> runningTasks,
DateTime lastCompletedTaskTime
)
{
- this(worker, currCapacityUsed, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
+ this(worker, currCapacityUsed, currParallelIndexCapacityUsed, availabilityGroups,
+ runningTasks, lastCompletedTaskTime, null);
+ }
+
+ public ImmutableWorkerInfo(
+ Worker worker,
+ int currCapacityUsed,
+ Set<String> availabilityGroups,
+ Collection<String> runningTasks,
+ DateTime lastCompletedTaskTime
+ )
+ {
+ this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}
@JsonProperty("worker")
@@ -85,6 +102,12 @@ public class ImmutableWorkerInfo
return currCapacityUsed;
}
+ @JsonProperty("currParallelIndexCapacityUsed")
+ public int getCurrParallelIndexCapacityUsed()
+ {
+ return currParallelIndexCapacityUsed;
+ }
+
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
@@ -119,12 +142,36 @@ public class ImmutableWorkerInfo
return worker.getVersion().compareTo(minVersion) >= 0;
}
- public boolean canRunTask(Task task)
+ public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
+ && canRunParallelIndexTask(task, parallelIndexTaskSlotRatio)
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}
+ private boolean canRunParallelIndexTask(Task task, double parallelIndexTaskSlotRatio)
+ {
+ if (!task.getType().equals(ParallelIndexSupervisorTask.TYPE)) {
+ return true;
+ }
+ return getWorkerParallelIndexCapacity(parallelIndexTaskSlotRatio) - getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+
+ }
+
+ private int getWorkerParallelIndexCapacity(double parallelIndexTaskSlotRatio)
+ {
+ int totalCapacity = worker.getCapacity();
+ int workerParallelIndexCapacity = (int) Math.floor(parallelIndexTaskSlotRatio * totalCapacity);
+ if (workerParallelIndexCapacity < 1) {
+ workerParallelIndexCapacity = 1;
+ }
+ if (workerParallelIndexCapacity > totalCapacity) {
+ workerParallelIndexCapacity = totalCapacity;
+ }
+ return workerParallelIndexCapacity;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -140,6 +187,9 @@ public class ImmutableWorkerInfo
if (currCapacityUsed != that.currCapacityUsed) {
return false;
}
+ if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
+ return false;
+ }
if (!worker.equals(that.worker)) {
return false;
}
@@ -162,6 +212,7 @@ public class ImmutableWorkerInfo
{
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
+ result = 31 * result + currParallelIndexCapacityUsed;
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
@@ -175,6 +226,7 @@ public class ImmutableWorkerInfo
return "ImmutableWorkerInfo{" +
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
+ ", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
index 2be16ae..a875090 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.annotations.UsedInGeneratedCode;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
@@ -110,6 +111,18 @@ public class ZkWorker implements Closeable
return currCapacity;
}
+ @JsonProperty("currParallelIndexCapacityUsed")
+ public int getCurrParallelIndexCapacityUsed()
+ {
+ int currParallelIndexCapacityUsed = 0;
+ for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+ if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
+ currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
+ }
+ }
+ return currParallelIndexCapacityUsed;
+ }
+
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
@@ -168,6 +181,7 @@ public class ZkWorker implements Closeable
return new ImmutableWorkerInfo(
worker.get(),
getCurrCapacityUsed(),
+ getCurrParallelIndexCapacityUsed(),
getAvailabilityGroups(),
getRunningTaskIds(),
lastCompletedTaskTime.get(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 7d83b3d..1c9ba5f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -477,9 +478,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
private static ImmutableWorkerInfo workerWithTask(ImmutableWorkerInfo immutableWorker, Task task)
{
+ int parallelIndexTaskCapacity = task.getType().equals(ParallelIndexSupervisorTask.TYPE)
+ ? task.getTaskResource().getRequiredCapacity()
+ : 0;
return new ImmutableWorkerInfo(
immutableWorker.getWorker(),
immutableWorker.getCurrCapacityUsed() + 1,
+ immutableWorker.getCurrParallelIndexCapacityUsed() + parallelIndexTaskCapacity,
Sets.union(
immutableWorker.getAvailabilityGroups(),
Sets.newHashSet(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
index c7f1345..d916b3e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java
@@ -26,8 +26,26 @@ public class WorkerTaskRunnerConfig
@JsonProperty
private String minWorkerVersion = "0";
+ @JsonProperty
+ private double parallelIndexTaskSlotRatio = 1;
+
public String getMinWorkerVersion()
{
return minWorkerVersion;
}
+
+ /**
+ * The number of task slots that a parallel indexing task can take is restricted using this config as a multiplier
+ *
+ * A value of 1 means no restriction on the number of slots ParallelIndexSupervisorTasks can occupy (default behaviour)
+ * A value of 0 means ParallelIndexSupervisorTasks can occupy no slots.
+ * Deadlocks can occur if the all task slots are occupied by ParallelIndexSupervisorTasks,
+ * as no subtask would ever get a slot. Set this config to a value < 1 to prevent deadlocks.
+ *
+ * @return ratio of task slots available to a parallel indexing task at a worker level
+ */
+ public double getParallelIndexTaskSlotRatio()
+ {
+ return parallelIndexTaskSlotRatio;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index ff9aab1..0bf4de0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@@ -149,6 +150,17 @@ public class WorkerHolder
return currCapacity;
}
+ private int getCurrParallelIndexCapcityUsed()
+ {
+ int currParallelIndexCapacityUsed = 0;
+ for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
+ if (taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
+ currParallelIndexCapacityUsed += taskAnnouncement.getTaskResource().getRequiredCapacity();
+ }
+ }
+ return currParallelIndexCapacityUsed;
+ }
+
private Set<String> getAvailabilityGroups()
{
Set<String> retVal = new HashSet<>();
@@ -193,6 +205,7 @@ public class WorkerHolder
return new ImmutableWorkerInfo(
w,
getCurrCapacityUsed(),
+ getCurrParallelIndexCapcityUsed(),
getAvailabilityGroups(),
getRunningTasks().keySet(),
lastCompletedTaskTime.get(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index 24721e8..c3832da 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -150,7 +150,7 @@ public class WorkerSelectUtils
{
return allWorkers.values()
.stream()
- .filter(worker -> worker.canRunTask(task)
+ .filter(worker -> worker.canRunTask(task, workerTaskRunnerConfig.getParallelIndexTaskSlotRatio())
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion()))
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity()));
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
index 785698e..e373eb9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
@@ -22,6 +22,10 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -29,6 +33,9 @@ import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ImmutableWorkerInfoTest
{
@Test
@@ -193,6 +200,7 @@ public class ImmutableWorkerInfoTest
"http", "testWorker1", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
3,
+ 0,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z"),
@@ -202,6 +210,7 @@ public class ImmutableWorkerInfoTest
"http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
+ 0,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z"),
@@ -209,6 +218,57 @@ public class ImmutableWorkerInfoTest
), false);
}
+ @Test
+ public void test_canRunTask()
+ {
+ ImmutableWorkerInfo workerInfo = new ImmutableWorkerInfo(
+ new Worker("http", "testWorker2", "192.0.0.1", 10, "v1", WorkerConfig.DEFAULT_CATEGORY),
+ 6,
+ 0,
+ ImmutableSet.of("grp1", "grp2"),
+ ImmutableSet.of("task1", "task2"),
+ DateTimes.of("2015-01-01T01:01:02Z")
+ );
+
+
+ // Parallel index task
+ TaskResource taskResource0 = mock(TaskResource.class);
+ when(taskResource0.getRequiredCapacity()).thenReturn(3);
+ Task parallelIndexTask = mock(ParallelIndexSupervisorTask.class);
+ when(parallelIndexTask.getType()).thenReturn(ParallelIndexSupervisorTask.TYPE);
+ when(parallelIndexTask.getTaskResource()).thenReturn(taskResource0);
+
+ // Since task satisifies parallel and total slot constraints, can run
+ Assert.assertTrue(workerInfo.canRunTask(parallelIndexTask, 0.5));
+
+ // Since task fails the parallel slot constraint, it cannot run (3 > 1)
+ Assert.assertFalse(workerInfo.canRunTask(parallelIndexTask, 0.1));
+
+
+ // Some other indexing task
+ TaskResource taskResource1 = mock(TaskResource.class);
+ when(taskResource1.getRequiredCapacity()).thenReturn(5);
+ Task anyOtherTask = mock(IndexTask.class);
+ when(anyOtherTask.getType()).thenReturn("index");
+ when(anyOtherTask.getTaskResource()).thenReturn(taskResource1);
+
+ // Not a parallel index task -> satisfies parallel index constraint
+ // But does not satisfy the total slot constraint and cannot run (11 > 10)
+ Assert.assertFalse(workerInfo.canRunTask(anyOtherTask, 0.5));
+
+
+ // Task has an availability conflict ("grp1")
+ TaskResource taskResource2 = mock(TaskResource.class);
+ when(taskResource2.getRequiredCapacity()).thenReturn(1);
+ when(taskResource2.getAvailabilityGroup()).thenReturn("grp1");
+ Task grp1Task = mock(IndexTask.class);
+ when(grp1Task.getType()).thenReturn("blah");
+ when(grp1Task.getTaskResource()).thenReturn(taskResource2);
+
+ // Satisifies parallel index and total index slot constraints but cannot run due availability
+ Assert.assertFalse(workerInfo.canRunTask(grp1Task, 0.3));
+ }
+
private void assertEqualsAndHashCode(ImmutableWorkerInfo o1, ImmutableWorkerInfo o2, boolean shouldMatch)
{
if (shouldMatch) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
index 8717edc..c767bf6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java
@@ -57,10 +57,11 @@ public class JavaScriptWorkerSelectStrategyTest
+ "}\n"
+ "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n"
+ "var minWorkerVer = config.getMinWorkerVersion();\n"
+ + "var parallelIndexTaskSlotRatio = config.getParallelIndexTaskSlotRatio();\n"
+ "for (var i = 0; i < sortedWorkers.length; i++) {\n"
+ " var worker = sortedWorkers[i];\n"
+ " var zkWorker = zkWorkers.get(worker);\n"
- + " if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {\n"
+ + " if (zkWorker.canRunTask(task, parallelIndexTaskSlotRatio) && zkWorker.isValidVersion(minWorkerVer)) {\n"
+ " if (task.getType() == 'index_hadoop' && batch_workers.contains(worker)) {\n"
+ " return worker;\n"
+ " } else {\n"
@@ -238,8 +239,9 @@ public class JavaScriptWorkerSelectStrategyTest
private ImmutableWorkerInfo createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion)
{
ImmutableWorkerInfo worker = EasyMock.createMock(ImmutableWorkerInfo.class);
- EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes();
+ EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class), EasyMock.anyDouble())).andReturn(canRunTask).anyTimes();
EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes();
+ EasyMock.expect(worker.getCurrParallelIndexCapacityUsed()).andReturn(0).anyTimes();
EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes();
EasyMock.replay(worker);
return worker;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org