You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/27 04:53:22 UTC

[incubator-pinot] branch master updated: Unify the minion plug-in package regex path (#6980)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b5dcb7  Unify the minion plug-in package regex path (#6980)
0b5dcb7 is described below

commit 0b5dcb7aed1b85ab26345ab6c1076ccb88358f4a
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed May 26 21:53:00 2021 -0700

    Unify the minion plug-in package regex path (#6980)
    
    ## Description
    Unify minion pluggable class (`PinotTaskGenerator`, `PinotTaskExecutorFactory`, `MinionEventObserverFactory`) package regex path to `org.apache.pinot.*.plugin.minion.tasks.*`.
    Modify `SimpleMinionClusterIntegrationTest` to use the pluggable classes.
    
    ## Release Notes
    Regex path for pluggable `MinionEventObserverFactory` is changed from `org.apache.pinot.*.event.*` to `org.apache.pinot.*.plugin.minion.tasks.*`
---
 .../minion/generator/TaskGeneratorRegistry.java    |   2 +-
 .../tests/BasicAuthBatchIntegrationTest.java       |  28 ++--
 .../tests/BasicAuthRealtimeIntegrationTest.java    |   5 +-
 .../pinot/integration/tests/ClusterTest.java       |  24 +--
 ...vertToRawIndexMinionClusterIntegrationTest.java |   2 +-
 ...fflineSegmentsMinionClusterIntegrationTest.java |   2 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  | 161 ++-------------------
 .../minion/tasks/TestEventObserverFactory.java     |  73 ++++++++++
 .../minion/tasks/TestTaskExecutorFactory.java      |  81 +++++++++++
 .../plugin/minion/tasks/TestTaskGenerator.java     |  72 +++++++++
 .../minion/event/EventObserverFactoryRegistry.java |   9 +-
 .../executor/TaskExecutorFactoryRegistry.java      |   6 +-
 .../annotations/minion/EventObserverFactory.java   |   2 +-
 .../annotations/minion/TaskExecutorFactory.java    |   2 +-
 .../spi/annotations/minion/TaskGenerator.java      |   2 +-
 15 files changed, 276 insertions(+), 195 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 95dee4b..083996d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -40,7 +40,7 @@ public class TaskGeneratorRegistry {
   private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>();
 
   /**
-   * The package regex pattern for auto-registered {@link TaskGenerator}.
+   * The package regex pattern for auto-registered {@link PinotTaskGenerator}.
    */
   public static final String TASK_GENERATOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
index 1bb6825..b5d0bb7 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -63,7 +62,7 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
     startController();
     startBroker();
     startServer();
-    startMinion(Collections.emptyList(), Collections.emptyList());
+    startMinion();
   }
 
   @AfterClass(alwaysRun = true)
@@ -99,8 +98,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
   @Test
   public void testBrokerNoAuth()
       throws Exception {
-    JsonNode response =
-        JsonUtils.stringToJsonNode(sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}"));
+    JsonNode response = JsonUtils.stringToJsonNode(
+        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}"));
     Assert.assertFalse(response.has("resultTable"), "must not return result table");
     Assert.assertTrue(response.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code");
   }
@@ -109,7 +108,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
   public void testBroker()
       throws Exception {
     JsonNode response = JsonUtils.stringToJsonNode(
-        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}", AUTH_HEADER));
+        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}",
+            AUTH_HEADER));
     Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG",
         "must return result with LONG value");
     Assert.assertTrue(response.get("exceptions").isEmpty(), "must not return exception");
@@ -118,7 +118,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
   @Test
   public void testControllerGetTables()
       throws Exception {
-    JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER));
+    JsonNode response =
+        JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER));
     Assert.assertTrue(response.get("tables").isArray(), "must return table array");
   }
 
@@ -155,18 +156,17 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
 
     // patch ingestion job file
     String jobFileContents = IOUtils.toString(new FileInputStream(jobFile));
-    IOUtils.write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())),
-        new FileOutputStream(jobFile));
+    IOUtils
+        .write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())), new FileOutputStream(jobFile));
 
-    new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN)
-        .execute();
+    new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN).execute();
 
     Thread.sleep(5000);
 
     // admin with full access
     JsonNode response = JsonUtils.stringToJsonNode(
-        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}",
-            AUTH_HEADER));
+        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql",
+            "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER));
     Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG",
         "must return result with LONG value");
     Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "count(*)",
@@ -177,8 +177,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
 
     // user with valid auth but no table access
     JsonNode responseUser = JsonUtils.stringToJsonNode(
-        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}",
-            AUTH_HEADER_USER));
+        sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql",
+            "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER_USER));
     Assert.assertFalse(responseUser.has("resultTable"), "must not return result table");
     Assert.assertTrue(responseUser.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code");
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
index 05c2264..4a9105e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
@@ -61,7 +61,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest
     startController();
     startBroker();
     startServer();
-    startMinion(null, null);
+    startMinion();
 
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
@@ -141,8 +141,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest
   @Override
   protected Connection getPinotConnection() {
     if (_pinotConnection == null) {
-      _pinotConnection =
-          ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER);
+      _pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER);
     }
     return _pinotConnection;
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index fdb1d5c..83cd38b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import javax.annotation.Nullable;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -53,8 +52,6 @@ import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.minion.MinionStarter;
-import org.apache.pinot.minion.event.MinionEventObserverFactory;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig;
@@ -161,8 +158,7 @@ public abstract class ClusterTest extends ControllerTest {
   }
 
   protected void startServer(PinotConfiguration configuration) {
-    startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT,
-        getZkUrl());
+    startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
   }
 
   protected void startServers(int numServers) {
@@ -206,26 +202,13 @@ public abstract class ClusterTest extends ControllerTest {
 
   // NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
   //       to manage the instance level configs
-  protected void startMinion(@Nullable List<PinotTaskExecutorFactory> taskExecutorFactories,
-      @Nullable List<MinionEventObserverFactory> eventObserverFactories) {
+  protected void startMinion() {
     FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
     try {
       PinotConfiguration minionConf = getDefaultMinionConfiguration();
       minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
       minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
       _minionStarter = new MinionStarter(minionConf);
-      // Register task executor factories
-      if (taskExecutorFactories != null) {
-        for (PinotTaskExecutorFactory taskExecutorFactory : taskExecutorFactories) {
-          _minionStarter.registerTaskExecutorFactory(taskExecutorFactory);
-        }
-      }
-      // Register event observer factories
-      if (eventObserverFactories != null) {
-        for (MinionEventObserverFactory eventObserverFactory : eventObserverFactories) {
-          _minionStarter.registerEventObserverFactory(eventObserverFactory);
-        }
-      }
       _minionStarter.start();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -420,7 +403,8 @@ public abstract class ClusterTest extends ControllerTest {
   /**
    * Queries the broker's pql query endpoint (/query)
    */
-  public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType, Map<String, String> headers)
+  public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType,
+      Map<String, String> headers)
       throws Exception {
     ObjectNode payload = JsonUtils.newObjectNode();
     payload.put(queryType, query);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 1d96c71..83312b2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -81,7 +81,7 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
     // The parent setUp() sets up Zookeeper, Kafka, controller, broker and servers
     super.setUp();
 
-    startMinion(null, null);
+    startMinion();
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 91ca0aa..6a8f654 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -71,7 +71,7 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt
     // Setup realtime table, and blank offline table
     super.setUp();
     addTableConfig(createOfflineTableConfig());
-    startMinion(null, null);
+    startMinion();
 
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index c753611..17e984c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -18,34 +18,19 @@
  */
 package org.apache.pinot.integration.tests;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.event.MinionEventObserver;
-import org.apache.pinot.minion.event.MinionEventObserverFactory;
-import org.apache.pinot.minion.exception.TaskCancelledException;
-import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
-import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -59,20 +44,21 @@ import static org.testng.Assert.*;
  * minion functionality.
  */
 public class SimpleMinionClusterIntegrationTest extends ClusterTest {
-  private static final String TASK_TYPE = "TestTask";
-  private static final String TABLE_NAME_1 = "testTable1";
-  private static final String TABLE_NAME_2 = "testTable2";
-  private static final String TABLE_NAME_3 = "testTable3";
+  // Accessed by the plug-in classes
+  public static final String TASK_TYPE = "TestTask";
+  public static final String TABLE_NAME_1 = "testTable1";
+  public static final String TABLE_NAME_2 = "testTable2";
+  public static final String TABLE_NAME_3 = "testTable3";
+  public static final int NUM_TASKS = 2;
+  public static final int NUM_CONFIGS = 3;
+  public static final AtomicBoolean HOLD = new AtomicBoolean();
+  public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
+  public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
+  public static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
+  public static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
+
   private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L;  // 1 minute
   private static final long ZK_CALLBACK_TIMEOUT_MS = 30_000L;       // 30 seconds
-  private static final int NUM_TASKS = 2;
-  private static final int NUM_CONFIGS = 3;
-
-  private static final AtomicBoolean HOLD = new AtomicBoolean();
-  private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
-  private static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
-  private static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
-  private static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
 
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotTaskManager _taskManager;
@@ -96,13 +82,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
 
-    // Register the test task generator into task manager
-    PinotTaskGenerator taskGenerator = new TestTaskGenerator();
-    taskGenerator.init(_taskManager.getClusterInfoAccessor());
-    _taskManager.registerTaskGenerator(taskGenerator);
-
-    startMinion(Collections.singletonList(new TestTaskExecutorFactory()),
-        Collections.singletonList(new TestEventObserverFactory()));
+    startMinion();
   }
 
   @Test
@@ -229,117 +209,4 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     stopController();
     stopZk();
   }
-
-  private static class TestTaskGenerator implements PinotTaskGenerator {
-
-    private ClusterInfoAccessor _clusterInfoAccessor;
-
-    @Override
-    public void init(ClusterInfoAccessor clusterInfoAccessor) {
-      _clusterInfoAccessor = clusterInfoAccessor;
-    }
-
-    @Override
-    public String getTaskType() {
-      return TASK_TYPE;
-    }
-
-    @Override
-    public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
-      assertEquals(tableConfigs.size(), NUM_TASKS);
-
-      // Generate at most 2 tasks
-      if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) {
-        return Collections.emptyList();
-      }
-
-      List<PinotTaskConfig> taskConfigs = new ArrayList<>();
-      for (TableConfig tableConfig : tableConfigs) {
-        Map<String, String> configs = new HashMap<>();
-        configs.put("tableName", tableConfig.getTableName());
-        configs.put("tableType", tableConfig.getTableType().toString());
-        taskConfigs.add(new PinotTaskConfig(TASK_TYPE, configs));
-      }
-      return taskConfigs;
-    }
-  }
-
-  public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
-
-    @Override
-    public void init(MinionTaskZkMetadataManager zkMetadataManager) {
-    }
-
-    @Override
-    public String getTaskType() {
-      return TASK_TYPE;
-    }
-
-    @Override
-    public PinotTaskExecutor create() {
-      return new BaseTaskExecutor() {
-        @Override
-        public Boolean executeTask(PinotTaskConfig pinotTaskConfig) {
-          assertTrue(MINION_CONTEXT.getDataDir().exists());
-          assertNotNull(MINION_CONTEXT.getMinionMetrics());
-          assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
-
-          assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
-          Map<String, String> configs = pinotTaskConfig.getConfigs();
-          assertEquals(configs.size(), NUM_CONFIGS);
-          String offlineTableName = configs.get("tableName");
-          assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE);
-          String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
-          assertTrue(rawTableName.equals(TABLE_NAME_1) || rawTableName.equals(TABLE_NAME_2));
-          assertEquals(configs.get("tableType"), TableType.OFFLINE.toString());
-
-          do {
-            if (_cancelled) {
-              throw new TaskCancelledException("Task has been cancelled");
-            }
-          } while (HOLD.get());
-          return true;
-        }
-      };
-    }
-  }
-
-  public static class TestEventObserverFactory implements MinionEventObserverFactory {
-
-    @Override
-    public void init(MinionTaskZkMetadataManager zkMetadataManager) {
-    }
-
-    @Override
-    public String getTaskType() {
-      return TASK_TYPE;
-    }
-
-    @Override
-    public MinionEventObserver create() {
-      return new MinionEventObserver() {
-        @Override
-        public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
-          TASK_START_NOTIFIED.set(true);
-        }
-
-        @Override
-        public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
-          assertTrue(executionResult instanceof Boolean);
-          assertTrue((Boolean) executionResult);
-          TASK_SUCCESS_NOTIFIED.set(true);
-        }
-
-        @Override
-        public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
-          TASK_CANCELLED_NOTIFIED.set(true);
-        }
-
-        @Override
-        public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
-          TASK_ERROR_NOTIFIED.set(true);
-        }
-      };
-    }
-  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java
new file mode 100644
index 0000000..91ff5f1
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Event observer factory for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@EventObserverFactory
+public class TestEventObserverFactory implements MinionEventObserverFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+  }
+
+  @Override
+  public MinionEventObserver create() {
+    return new MinionEventObserver() {
+      @Override
+      public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+        SimpleMinionClusterIntegrationTest.TASK_START_NOTIFIED.set(true);
+      }
+
+      @Override
+      public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
+        assertTrue(executionResult instanceof Boolean);
+        assertTrue((Boolean) executionResult);
+        SimpleMinionClusterIntegrationTest.TASK_SUCCESS_NOTIFIED.set(true);
+      }
+
+      @Override
+      public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
+        SimpleMinionClusterIntegrationTest.TASK_CANCELLED_NOTIFIED.set(true);
+      }
+
+      @Override
+      public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
+        SimpleMinionClusterIntegrationTest.TASK_ERROR_NOTIFIED.set(true);
+      }
+    };
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
new file mode 100644
index 0000000..d14509a
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import java.util.Map;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Task executor factory for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@TaskExecutorFactory
+public class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+  }
+
+  @Override
+  public PinotTaskExecutor create() {
+    return new BaseTaskExecutor() {
+      @Override
+      public Boolean executeTask(PinotTaskConfig pinotTaskConfig) {
+        assertTrue(MINION_CONTEXT.getDataDir().exists());
+        assertNotNull(MINION_CONTEXT.getMinionMetrics());
+        assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
+
+        assertEquals(pinotTaskConfig.getTaskType(), SimpleMinionClusterIntegrationTest.TASK_TYPE);
+        Map<String, String> configs = pinotTaskConfig.getConfigs();
+        assertEquals(configs.size(), SimpleMinionClusterIntegrationTest.NUM_CONFIGS);
+        String offlineTableName = configs.get("tableName");
+        assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE);
+        String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
+        assertTrue(rawTableName.equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_1) || rawTableName
+            .equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_2));
+        assertEquals(configs.get("tableType"), TableType.OFFLINE.toString());
+
+        do {
+          if (_cancelled) {
+            throw new TaskCancelledException("Task has been cancelled");
+          }
+        } while (SimpleMinionClusterIntegrationTest.HOLD.get());
+        return true;
+      }
+    };
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
new file mode 100644
index 0000000..740d3e5
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Task generator for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@TaskGenerator
+public class TestTaskGenerator implements PinotTaskGenerator {
+  private ClusterInfoAccessor _clusterInfoAccessor;
+
+  @Override
+  public void init(ClusterInfoAccessor clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
+  }
+
+  @Override
+  public String getTaskType() {
+    return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    assertEquals(tableConfigs.size(), SimpleMinionClusterIntegrationTest.NUM_TASKS);
+
+    // Generate at most 2 tasks
+    if (_clusterInfoAccessor.getTaskStates(SimpleMinionClusterIntegrationTest.TASK_TYPE).size()
+        >= SimpleMinionClusterIntegrationTest.NUM_TASKS) {
+      return Collections.emptyList();
+    }
+
+    List<PinotTaskConfig> taskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tableName", tableConfig.getTableName());
+      configs.put("tableType", tableConfig.getTableType().toString());
+      taskConfigs.add(new PinotTaskConfig(SimpleMinionClusterIntegrationTest.TASK_TYPE, configs));
+    }
+    return taskConfigs;
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
index 716bdd8..d220f8d 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
@@ -34,6 +34,11 @@ import org.slf4j.LoggerFactory;
 public class EventObserverFactoryRegistry {
   private static final Logger LOGGER = LoggerFactory.getLogger(EventObserverFactoryRegistry.class);
 
+  /**
+   * The package regex pattern for auto-registered {@link MinionEventObserverFactory}.
+   */
+  private static final String EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
+
   private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
 
   /**
@@ -43,8 +48,8 @@ public class EventObserverFactoryRegistry {
    */
   public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
     long startTimeMs = System.currentTimeMillis();
-    Set<Class<?>> classes =
-        PinotReflectionUtils.getClassesThroughReflection(".*\\.event\\..*", EventObserverFactory.class);
+    Set<Class<?>> classes = PinotReflectionUtils
+        .getClassesThroughReflection(EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN, EventObserverFactory.class);
     for (Class<?> clazz : classes) {
       EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class);
       if (annotation.enabled()) {
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 6b5f2c2..3ba1c40 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -34,9 +34,9 @@ public class TaskExecutorFactoryRegistry {
   private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorFactoryRegistry.class);
 
   /**
-   * The package regex pattern for auto-registered {@link TaskExecutorFactory}.
+   * The package regex pattern for auto-registered {@link PinotTaskExecutorFactory}.
    */
-  private static final String TASK_EXECUTOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
+  private static final String TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
 
   private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>();
 
@@ -68,7 +68,7 @@ public class TaskExecutorFactoryRegistry {
 
   public static Set<Class<?>> getTaskExecutorFactoryClasses() {
     return PinotReflectionUtils
-        .getClassesThroughReflection(TASK_EXECUTOR_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class);
+        .getClassesThroughReflection(TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class);
   }
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
index 2aa8d68..df85a32 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
  *
  * NOTE:
  *   - The annotated class must implement the MinionEventObserverFactory interface
- *   - The annotated class must be under the package of name 'org.apache.pinot.*.event.*' to be auto-registered.
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
index 38166b0..a4c6c4d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
  *
  * NOTE:
  *   - The annotated class must implement the PinotTaskExecutorFactory interface
- *   - The annotated class must be under the package of name 'org.apache.pinot.*.executor.*' to be auto-registered.
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
index 33c7511..c615708 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
  *
  * NOTE:
  *   - The annotated class must implement the PinotTaskGenerator interface
- *   - The annotated class must be under the package of name 'org.apache.pinot.plugin.minion.tasks.*' to be auto-registered.
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.TYPE)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org