You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/17 18:35:41 UTC
[incubator-pinot] branch master updated: Enhance task schedule api
for single type/table support (#6352)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2ea0185 Enhance task schedule api for single type/table support (#6352)
2ea0185 is described below
commit 2ea01854866482fe8206a55dec4f2f1b2a648874
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Dec 17 10:35:22 2020 -0800
Enhance task schedule api for single type/table support (#6352)
Added 2 optional query parameters to the task schedule API to schedule tasks for the given task type/table.
Examples:
- POST /tasks/schedule?taskType=MyTask
- POST /tasks/schedule?tableName=myTable_OFFLINE
- POST /tasks/schedule?taskType=MyTask&tableName=myTable_OFFLINE
---
.../apache/pinot/common/minion/MinionClient.java | 17 ++-
.../common/minion/MinionRequestURLBuilder.java | 14 +-
.../pinot/common/minion/MinionClientTest.java | 2 +-
.../api/resources/PinotTaskRestletResource.java | 16 +-
.../helix/core/minion/PinotTaskManager.java | 169 ++++++++++++++-------
...vertToRawIndexMinionClusterIntegrationTest.java | 6 +-
...fflineSegmentsMinionClusterIntegrationTest.java | 6 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 17 +--
8 files changed, 157 insertions(+), 90 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
index beb98d5..dffb841 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.common.minion;
+import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
@@ -30,8 +32,6 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MinionClient {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class);
private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
private static final String ACCEPT = "accept";
private static final String APPLICATION_JSON = "application/json";
@@ -67,9 +65,10 @@ public class MinionClient {
return _controllerUrl;
}
- public Map<String, String> scheduleMinionTasks()
+ public Map<String, String> scheduleMinionTasks(@Nullable String taskType, @Nullable String tableNameWithType)
throws IOException {
- HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule());
+ HttpPost httpPost = createHttpPostRequest(
+ MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule(taskType, tableNameWithType));
HttpResponse response = HTTP_CLIENT.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
final String responseString = IOUtils.toString(response.getEntity().getContent());
@@ -77,7 +76,8 @@ public class MinionClient {
throw new HttpException(String
.format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString));
}
- return JsonUtils.stringToObject(responseString, Map.class);
+ return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() {
+ });
}
public Map<String, String> getTasksStates(String taskType)
@@ -91,7 +91,8 @@ public class MinionClient {
throw new HttpException(String
.format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString));
}
- return JsonUtils.stringToObject(responseString, Map.class);
+ return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() {
+ });
}
public String getTaskState(String taskName)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
index 4432bc9..01e8f2f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.minion;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.StringUtil;
@@ -37,8 +38,17 @@ public class MinionRequestURLBuilder {
return new MinionRequestURLBuilder(baseUrl);
}
- public String forTaskSchedule() {
- return StringUtil.join("/", _baseUrl, "tasks/schedule");
+ public String forTaskSchedule(@Nullable String taskType, @Nullable String tableNameWithType) {
+ String url = StringUtil.join("/", _baseUrl, "tasks/schedule");
+ if (taskType != null && tableNameWithType != null) {
+ return url + "?taskType=" + taskType + "&tableName=" + tableNameWithType;
+ } else if (taskType != null) {
+ return url + "?taskType=" + taskType;
+ } else if (tableNameWithType != null) {
+ return url + "?tableName=" + tableNameWithType;
+ } else {
+ return url;
+ }
}
public String forListAllTasks(String taskType) {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
index 869dde1..eddb888 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
@@ -63,7 +63,7 @@ public class MinionClientTest {
createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}",
0));
MinionClient minionClient = new MinionClient("localhost", "14202");
- Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"),
+ Assert.assertEquals(minionClient.scheduleMinionTasks(null, null).get("SegmentGenerationAndPushTask"),
"Task_SegmentGenerationAndPushTask_1607470525615");
httpServer.stop(0);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 745ff30..aa3235e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -168,9 +169,18 @@ public class PinotTaskRestletResource {
@POST
@Path("/tasks/schedule")
- @ApiOperation("Schedule tasks")
- public Map<String, String> scheduleTasks() {
- return _pinotTaskManager.scheduleTasks();
+ @ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
+ public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+ @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+ if (taskType != null) {
+ // Schedule task for the given task type
+ String taskName = tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName)
+ : _pinotTaskManager.scheduleTask(taskType);
+ return Collections.singletonMap(taskType, taskName);
+ } else {
+ // Schedule tasks for all task types
+ return tableName != null ? _pinotTaskManager.scheduleTasks(tableName) : _pinotTaskManager.scheduleTasks();
+ }
}
@Deprecated
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a17718..fe97326 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
@@ -33,7 +36,6 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,11 +65,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
}
/**
- * Returns the cluster info provider.
- * <p>
- * Cluster info provider might be useful when initializing task generators.
- *
- * @return Cluster info provider
+ * Returns the cluster info accessor.
+ * <p>Cluster info accessor can be used to initialize the task generator.
*/
public ClusterInfoAccessor getClusterInfoAccessor() {
return _clusterInfoAccessor;
@@ -75,87 +74,139 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
/**
* Registers a task generator.
- * <p>
- * This method can be used to plug in custom task generators.
- *
- * @param pinotTaskGenerator Task generator to be registered
+ * <p>This method can be used to plug in custom task generators.
*/
- public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
- _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
+ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
+ _taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
}
/**
- * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
+ * Public API to schedule tasks (all task types) for all tables. It might be called from the non-leader controller.
+ * Returns a map from the task type to the task scheduled.
*/
public synchronized Map<String, String> scheduleTasks() {
- Map<String, String> tasksScheduled = scheduleTasks(_pinotHelixResourceManager.getAllTables());
-
- // Reset the task because this method will be called from the Rest API instead of the periodic task scheduler
- // TODO: Clean up only the non-leader tables instead of all tables
- cleanUpTask();
- setUpTask();
-
- return tasksScheduled;
+ return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
}
/**
- * Check the Pinot cluster status and schedule new tasks for the given tables.
- *
- * @param tableNamesWithType List of table names with type suffix
- * @return Map from task type to task scheduled
+ * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
+ * from the task type to the task scheduled.
*/
- private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+ private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
- Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
- int numTaskTypes = taskTypes.size();
- Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(numTaskTypes);
-
- for (String taskType : taskTypes) {
- enabledTableConfigMap.put(taskType, new ArrayList<>());
-
- // Ensure all task queues exist
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- }
-
// Scan all table configs to get the tables with tasks enabled
+ Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null) {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig != null) {
- for (String taskType : taskTypes) {
- if (taskConfig.isTaskTypeEnabled(taskType)) {
- enabledTableConfigMap.get(taskType).add(tableConfig);
- }
- }
+ if (tableConfig != null && tableConfig.getTaskConfig() != null) {
+ Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
+ for (String enabledTaskType : enabledTaskTypes) {
+ enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList<>()).add(tableConfig);
}
}
}
// Generate each type of tasks
- Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
- for (String taskType : taskTypes) {
- LOGGER.info("Generating tasks for task type: {}", taskType);
- PinotTaskGenerator pinotTaskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
- List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
- int numTasks = pinotTaskConfigs.size();
- if (numTasks > 0) {
- LOGGER
- .info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
- tasksScheduled.put(taskType, _helixTaskResourceManager
- .submitTask(pinotTaskConfigs, pinotTaskGenerator.getTaskTimeoutMs(),
- pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
- _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+ Map<String, String> tasksScheduled = new HashMap<>();
+ for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
+ String taskType = entry.getKey();
+ List<TableConfig> enabledTableConfigs = entry.getValue();
+ PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ if (taskGenerator != null) {
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader));
+ } else {
+ List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size());
+ for (TableConfig enabledTableConfig : enabledTableConfigs) {
+ enabledTables.add(enabledTableConfig.getTableName());
+ }
+ LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
+ tasksScheduled.put(taskType, null);
}
}
return tasksScheduled;
}
+ /**
+ * Helper method to schedule task with the given task generator for the given tables that have the task enabled.
+ * Returns the task name, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
+ boolean isLeader) {
+ List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
+ if (!isLeader) {
+ taskGenerator.nonLeaderCleanUp();
+ }
+ int numTasks = pinotTaskConfigs.size();
+ if (numTasks > 0) {
+ String taskType = taskGenerator.getTaskType();
+ LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
+ _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+ return _helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(),
+ taskGenerator.getNumConcurrentTasksPerInstance());
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
+ * controller. Returns a map from the task type to the task scheduled.
+ */
+ @Nullable
+ public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
+ return scheduleTasks(Collections.singletonList(tableNameWithType), false);
+ }
+
+ /**
+ * Public API to schedule task for the given task type. It might be called from the non-leader controller. Returns the
+ * task name, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ public synchronized String scheduleTask(String taskType) {
+ PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+ // Scan all table configs to get the tables with task enabled
+ List<TableConfig> enabledTableConfigs = new ArrayList<>();
+ for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
+ .isTaskTypeEnabled(taskType)) {
+ enabledTableConfigs.add(tableConfig);
+ }
+ }
+
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ return scheduleTask(taskGenerator, enabledTableConfigs, false);
+ }
+
+ /**
+ * Public API to schedule task for the given task type on the given table. It might be called from the non-leader
+ * controller. Returns the task name, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ public synchronized String scheduleTask(String taskType, String tableNameWithType) {
+ PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
+
+ Preconditions
+ .checkState(tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
+ "Table: %s does not have task type: %s enabled", tableNameWithType, taskType);
+
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
+ }
+
@Override
protected void processTables(List<String> tableNamesWithType) {
- scheduleTasks(tableNamesWithType);
+ scheduleTasks(tableNamesWithType, true);
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 9fa9519..6848da8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -109,15 +109,15 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
}
// Should create the task queues and generate a ConvertToRawIndexTask task with 5 child tasks
- Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+ Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
Assert.assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE)));
// Should generate one more ConvertToRawIndexTask task with 3 child tasks
- Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+ Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
// Should not generate more tasks
- Assert.assertFalse(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+ Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED and new segments refreshed
TestUtils.waitForCondition(input -> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 3f80e95..6079bfd 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -106,13 +106,11 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt
long offlineSegmentTime = _dateSmallestDays;
for (int i = 0; i < 3; i++) {
// Schedule task
- Assert.assertTrue(
- _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ Assert.assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- Assert.assertFalse(
- _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ Assert.assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
expectedWatermark = expectedWatermark + 86400000;
// Wait at most 600 seconds for all tasks COMPLETED
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 5232b7a..e0a4ecc 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -48,10 +48,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
/**
@@ -109,15 +106,16 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
HOLD.set(true);
// Should create the task queues and generate a task
- assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TestTaskGenerator.TASK_TYPE)));
// Should generate one more task
- assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
// Should not generate more tasks
- assertFalse(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+ assertNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
+ assertNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
@@ -178,9 +176,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
_helixTaskResourceManager.deleteTaskQueue(TestTaskGenerator.TASK_TYPE, false);
// Wait at most 60 seconds for task queue to be deleted
- TestUtils.waitForCondition(input -> {
- return !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE);
- }, STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
+ TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE),
+ STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org