You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/17 18:35:41 UTC

[incubator-pinot] branch master updated: Enhance task schedule api for single type/table support (#6352)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea0185  Enhance task schedule api for single type/table support (#6352)
2ea0185 is described below

commit 2ea01854866482fe8206a55dec4f2f1b2a648874
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Dec 17 10:35:22 2020 -0800

    Enhance task schedule api for single type/table support (#6352)
    
    Added 2 optional query parameters to the task schedule API to schedule tasks for the given task type/table.
    Examples:
    - POST /tasks/schedule?taskType=MyTask
    - POST /tasks/schedule?tableName=myTable_OFFLINE
    - POST /tasks/schedule?taskType=MyTask&tableName=myTable_OFFLINE
---
 .../apache/pinot/common/minion/MinionClient.java   |  17 ++-
 .../common/minion/MinionRequestURLBuilder.java     |  14 +-
 .../pinot/common/minion/MinionClientTest.java      |   2 +-
 .../api/resources/PinotTaskRestletResource.java    |  16 +-
 .../helix/core/minion/PinotTaskManager.java        | 169 ++++++++++++++-------
 ...vertToRawIndexMinionClusterIntegrationTest.java |   6 +-
 ...fflineSegmentsMinionClusterIntegrationTest.java |   6 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  |  17 +--
 8 files changed, 157 insertions(+), 90 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
index beb98d5..dffb841 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.common.minion;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.httpclient.HttpException;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
@@ -30,8 +32,6 @@ import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class MinionClient {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class);
   private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
   private static final String ACCEPT = "accept";
   private static final String APPLICATION_JSON = "application/json";
@@ -67,9 +65,10 @@ public class MinionClient {
     return _controllerUrl;
   }
 
-  public Map<String, String> scheduleMinionTasks()
+  public Map<String, String> scheduleMinionTasks(@Nullable String taskType, @Nullable String tableNameWithType)
       throws IOException {
-    HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule());
+    HttpPost httpPost = createHttpPostRequest(
+        MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule(taskType, tableNameWithType));
     HttpResponse response = HTTP_CLIENT.execute(httpPost);
     int statusCode = response.getStatusLine().getStatusCode();
     final String responseString = IOUtils.toString(response.getEntity().getContent());
@@ -77,7 +76,8 @@ public class MinionClient {
       throw new HttpException(String
           .format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString));
     }
-    return JsonUtils.stringToObject(responseString, Map.class);
+    return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() {
+    });
   }
 
   public Map<String, String> getTasksStates(String taskType)
@@ -91,7 +91,8 @@ public class MinionClient {
       throw new HttpException(String
           .format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString));
     }
-    return JsonUtils.stringToObject(responseString, Map.class);
+    return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() {
+    });
   }
 
   public String getTaskState(String taskName)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
index 4432bc9..01e8f2f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.minion;
 
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.StringUtil;
 
@@ -37,8 +38,17 @@ public class MinionRequestURLBuilder {
     return new MinionRequestURLBuilder(baseUrl);
   }
 
-  public String forTaskSchedule() {
-    return StringUtil.join("/", _baseUrl, "tasks/schedule");
+  public String forTaskSchedule(@Nullable String taskType, @Nullable String tableNameWithType) {
+    String url = StringUtil.join("/", _baseUrl, "tasks/schedule");
+    if (taskType != null && tableNameWithType != null) {
+      return url + "?taskType=" + taskType + "&tableName=" + tableNameWithType;
+    } else if (taskType != null) {
+      return url + "?taskType=" + taskType;
+    } else if (tableNameWithType != null) {
+      return url + "?tableName=" + tableNameWithType;
+    } else {
+      return url;
+    }
   }
 
   public String forListAllTasks(String taskType) {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
index 869dde1..eddb888 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
@@ -63,7 +63,7 @@ public class MinionClientTest {
         createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}",
             0));
     MinionClient minionClient = new MinionClient("localhost", "14202");
-    Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"),
+    Assert.assertEquals(minionClient.scheduleMinionTasks(null, null).get("SegmentGenerationAndPushTask"),
         "Task_SegmentGenerationAndPushTask_1607470525615");
     httpServer.stop(0);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 745ff30..aa3235e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -168,9 +169,18 @@ public class PinotTaskRestletResource {
 
   @POST
   @Path("/tasks/schedule")
-  @ApiOperation("Schedule tasks")
-  public Map<String, String> scheduleTasks() {
-    return _pinotTaskManager.scheduleTasks();
+  @ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
+  public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+      @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+    if (taskType != null) {
+      // Schedule task for the given task type
+      String taskName = tableName != null ? _pinotTaskManager.scheduleTask(taskType, tableName)
+          : _pinotTaskManager.scheduleTask(taskType);
+      return Collections.singletonMap(taskType, taskName);
+    } else {
+      // Schedule tasks for all task types
+      return tableName != null ? _pinotTaskManager.scheduleTasks(tableName) : _pinotTaskManager.scheduleTasks();
+    }
   }
 
   @Deprecated
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a17718..fe97326 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
@@ -33,7 +36,6 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,11 +65,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   }
 
   /**
-   * Returns the cluster info provider.
-   * <p>
-   * Cluster info provider might be useful when initializing task generators.
-   *
-   * @return Cluster info provider
+   * Returns the cluster info accessor.
+   * <p>Cluster info accessor can be used to initialize the task generator.
    */
   public ClusterInfoAccessor getClusterInfoAccessor() {
     return _clusterInfoAccessor;
@@ -75,87 +74,139 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
 
   /**
    * Registers a task generator.
-   * <p>
-   * This method can be used to plug in custom task generators.
-   *
-   * @param pinotTaskGenerator Task generator to be registered
+   * <p>This method can be used to plug in custom task generators.
    */
-  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
-    _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
+  public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
+    _taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
   }
 
   /**
-   * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
+   * Public API to schedule tasks (all task types) for all tables. It might be called from the non-leader controller.
+   * Returns a map from the task type to the task scheduled.
    */
   public synchronized Map<String, String> scheduleTasks() {
-    Map<String, String> tasksScheduled = scheduleTasks(_pinotHelixResourceManager.getAllTables());
-
-    // Reset the task because this method will be called from the Rest API instead of the periodic task scheduler
-    // TODO: Clean up only the non-leader tables instead of all tables
-    cleanUpTask();
-    setUpTask();
-
-    return tasksScheduled;
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks for the given tables.
-   *
-   * @param tableNamesWithType List of table names with type suffix
-   * @return Map from task type to task scheduled
+   * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
+   * from the task type to the task scheduled.
    */
-  private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+  private synchronized Map<String, String> scheduleTasks(List<String> tableNamesWithType, boolean isLeader) {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
 
-    Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    int numTaskTypes = taskTypes.size();
-    Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(numTaskTypes);
-
-    for (String taskType : taskTypes) {
-      enabledTableConfigMap.put(taskType, new ArrayList<>());
-
-      // Ensure all task queues exist
-      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    }
-
     // Scan all table configs to get the tables with tasks enabled
+    Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
       TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null) {
-        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-        if (taskConfig != null) {
-          for (String taskType : taskTypes) {
-            if (taskConfig.isTaskTypeEnabled(taskType)) {
-              enabledTableConfigMap.get(taskType).add(tableConfig);
-            }
-          }
+      if (tableConfig != null && tableConfig.getTaskConfig() != null) {
+        Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
+        for (String enabledTaskType : enabledTaskTypes) {
+          enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList<>()).add(tableConfig);
         }
       }
     }
 
     // Generate each type of tasks
-    Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
-    for (String taskType : taskTypes) {
-      LOGGER.info("Generating tasks for task type: {}", taskType);
-      PinotTaskGenerator pinotTaskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
-      int numTasks = pinotTaskConfigs.size();
-      if (numTasks > 0) {
-        LOGGER
-            .info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
-        tasksScheduled.put(taskType, _helixTaskResourceManager
-            .submitTask(pinotTaskConfigs, pinotTaskGenerator.getTaskTimeoutMs(),
-                pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
-        _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+    Map<String, String> tasksScheduled = new HashMap<>();
+    for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
+      String taskType = entry.getKey();
+      List<TableConfig> enabledTableConfigs = entry.getValue();
+      PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+      if (taskGenerator != null) {
+        _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+        tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader));
+      } else {
+        List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size());
+        for (TableConfig enabledTableConfig : enabledTableConfigs) {
+          enabledTables.add(enabledTableConfig.getTableName());
+        }
+        LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
+        tasksScheduled.put(taskType, null);
       }
     }
 
     return tasksScheduled;
   }
 
+  /**
+   * Helper method to schedule task with the given task generator for the given tables that have the task enabled.
+   * Returns the task name, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
+      boolean isLeader) {
+    List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
+    if (!isLeader) {
+      taskGenerator.nonLeaderCleanUp();
+    }
+    int numTasks = pinotTaskConfigs.size();
+    if (numTasks > 0) {
+      String taskType = taskGenerator.getTaskType();
+      LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+      return _helixTaskResourceManager.submitTask(pinotTaskConfigs, taskGenerator.getTaskTimeoutMs(),
+          taskGenerator.getNumConcurrentTasksPerInstance());
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
+   * controller. Returns a map from the task type to the task scheduled.
+   */
+  @Nullable
+  public synchronized Map<String, String> scheduleTasks(String tableNameWithType) {
+    return scheduleTasks(Collections.singletonList(tableNameWithType), false);
+  }
+
+  /**
+   * Public API to schedule task for the given task type. It might be called from the non-leader controller. Returns the
+   * task name, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  public synchronized String scheduleTask(String taskType) {
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+    // Scan all table configs to get the tables with task enabled
+    List<TableConfig> enabledTableConfigs = new ArrayList<>();
+    for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
+          .isTaskTypeEnabled(taskType)) {
+        enabledTableConfigs.add(tableConfig);
+      }
+    }
+
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    return scheduleTask(taskGenerator, enabledTableConfigs, false);
+  }
+
+  /**
+   * Public API to schedule task for the given task type on the given table. It might be called from the non-leader
+   * controller. Returns the task name, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  public synchronized String scheduleTask(String taskType, String tableNameWithType) {
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
+
+    Preconditions
+        .checkState(tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
+            "Table: %s does not have task type: %s enabled", tableNameWithType, taskType);
+
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
+  }
+
   @Override
   protected void processTables(List<String> tableNamesWithType) {
-    scheduleTasks(tableNamesWithType);
+    scheduleTasks(tableNamesWithType, true);
   }
 
   @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 9fa9519..6848da8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -109,15 +109,15 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
     }
 
     // Should create the task queues and generate a ConvertToRawIndexTask task with 5 child tasks
-    Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+    Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
     Assert.assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(ConvertToRawIndexTask.TASK_TYPE)));
 
     // Should generate one more ConvertToRawIndexTask task with 3 child tasks
-    Assert.assertTrue(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+    Assert.assertNotNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
 
     // Should not generate more tasks
-    Assert.assertFalse(_taskManager.scheduleTasks().containsKey(ConvertToRawIndexTask.TASK_TYPE));
+    Assert.assertNull(_taskManager.scheduleTasks().get(ConvertToRawIndexTask.TASK_TYPE));
 
     // Wait at most 600 seconds for all tasks COMPLETED and new segments refreshed
     TestUtils.waitForCondition(input -> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 3f80e95..6079bfd 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -106,13 +106,11 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt
     long offlineSegmentTime = _dateSmallestDays;
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      Assert.assertTrue(
-          _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
           PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      Assert.assertFalse(
-          _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       expectedWatermark = expectedWatermark + 86400000;
       // Wait at most 600 seconds for all tasks COMPLETED
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 5232b7a..e0a4ecc 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -48,10 +48,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 /**
@@ -109,15 +106,16 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     HOLD.set(true);
 
     // Should create the task queues and generate a task
-    assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TestTaskGenerator.TASK_TYPE)));
 
     // Should generate one more task
-    assertTrue(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
 
     // Should not generate more tasks
-    assertFalse(_taskManager.scheduleTasks().containsKey(TestTaskGenerator.TASK_TYPE));
+    assertNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
+    assertNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
@@ -178,9 +176,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     _helixTaskResourceManager.deleteTaskQueue(TestTaskGenerator.TASK_TYPE, false);
 
     // Wait at most 60 seconds for task queue to be deleted
-    TestUtils.waitForCondition(input -> {
-      return !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE);
-    }, STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
+    TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE),
+        STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org