You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/01/05 20:36:02 UTC

[incubator-pinot] branch master updated: Make minion tasks pluggable via reflection (#6395)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8def748  Make minion tasks pluggable via reflection (#6395)
8def748 is described below

commit 8def7483b0f0d0cd09c5536f5792626d2d1f3c52
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jan 5 12:35:41 2021 -0800

    Make minion tasks pluggable via reflection (#6395)
    
    Introduces 3 annotations to plugin minion components:
    - TaskGenerator: For task generators to be plugged on controller side
    - TaskExecutorFactory: For task executor factories to be plugged on minion side
    - EventObserverFactory: For event observer factories to be plugged on minion side
    
    Annotated classes will automatically be plugged into the controller and minion.
---
 .../generator/ConvertToRawIndexTaskGenerator.java  |  7 +-
 .../core/minion/generator/PinotTaskGenerator.java  | 14 ++--
 .../RealtimeToOfflineSegmentsTaskGenerator.java    | 10 ++-
 .../SegmentGenerationAndPushTaskGenerator.java     | 32 ++++----
 .../minion/generator/TaskGeneratorRegistry.java    | 59 ++++++++++-----
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 43 ++++++-----
 .../pinot/integration/tests/ClusterTest.java       | 66 +++++++++--------
 .../tests/SimpleMinionClusterIntegrationTest.java  | 71 +++++++++++-------
 .../org/apache/pinot/minion/MinionStarter.java     | 25 +++----
 .../DefaultMinionEventObserver.java                |  2 +-
 .../DefaultMinionEventObserverFactory.java         | 18 ++++-
 .../minion/event/EventObserverFactoryRegistry.java | 85 ++++++++++++++++++++++
 .../{events => event}/MinionEventObserver.java     |  2 +-
 .../MinionEventObserverFactory.java                | 19 ++++-
 .../events/EventObserverFactoryRegistry.java       | 50 -------------
 .../ConvertToRawIndexTaskExecutorFactory.java      | 14 ++++
 .../executor/MergeRollupTaskExecutorFactory.java   | 15 ++++
 .../minion/executor/PinotTaskExecutorFactory.java  | 14 +++-
 .../minion/executor/PurgeTaskExecutorFactory.java  | 14 ++++
 ...altimeToOfflineSegmentsTaskExecutorFactory.java | 18 ++++-
 ...egmentGenerationAndPushTaskExecutorFactory.java | 15 ++++
 .../executor/TaskExecutorFactoryRegistry.java      | 62 ++++++++++------
 .../minion/taskfactory/TaskFactoryRegistry.java    | 22 +++---
 .../annotations/minion/EventObserverFactory.java   | 29 ++++----
 .../annotations/minion/TaskExecutorFactory.java    | 29 ++++----
 .../spi/annotations/minion/TaskGenerator.java      | 25 ++++---
 26 files changed, 489 insertions(+), 271 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index 437ac93..ed7e6b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -36,12 +37,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+@TaskGenerator
 public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
 
-  private final ClusterInfoAccessor _clusterInfoAccessor;
+  private ClusterInfoAccessor _clusterInfoAccessor;
 
-  public ConvertToRawIndexTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+  @Override
+  public void init(ClusterInfoAccessor clusterInfoAccessor) {
     _clusterInfoAccessor = clusterInfoAccessor;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 6fe55ee..85eed0e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.minion.generator;
 
 import java.util.List;
 import org.apache.helix.task.JobConfig;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 
@@ -30,23 +31,22 @@ import org.apache.pinot.spi.config.table.TableConfig;
 public interface PinotTaskGenerator {
 
   /**
+   * Initializes the task generator.
+   */
+  void init(ClusterInfoAccessor clusterInfoAccessor);
+
+  /**
    * Returns the task type of the generator.
-   *
-   * @return Task type of the generator
    */
   String getTaskType();
 
   /**
    * Generates a list of tasks to schedule based on the given table configs.
-   *
-   * @return List of tasks to schedule
    */
   List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
   /**
    * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
-   *
-   * @return Timeout in milliseconds for each task.
    */
   default long getTaskTimeoutMs() {
     return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
@@ -54,8 +54,6 @@ public interface PinotTaskGenerator {
 
   /**
    * Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
-   *
-   * @return Maximum number of concurrent tasks allowed per instance
    */
   default int getNumConcurrentTasksPerInstance() {
     return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index a0e519b..a278396 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -72,15 +73,17 @@ import org.slf4j.LoggerFactory;
  *
  *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
  */
+@TaskGenerator
 public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
 
   private static final String DEFAULT_BUCKET_PERIOD = "1d";
   private static final String DEFAULT_BUFFER_PERIOD = "2d";
 
-  private final ClusterInfoAccessor _clusterInfoAccessor;
+  private ClusterInfoAccessor _clusterInfoAccessor;
 
-  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+  @Override
+  public void init(ClusterInfoAccessor clusterInfoAccessor) {
     _clusterInfoAccessor = clusterInfoAccessor;
   }
 
@@ -101,7 +104,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
         LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, realtimeTableName);
         continue;
       }
-      StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      StreamConfig streamConfig =
+          new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(tableConfig));
       if (streamConfig.hasHighLevelConsumerType()) {
         LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", taskType, realtimeTableName);
         continue;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index c3b3f43..7921e09 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -83,13 +84,16 @@ import org.slf4j.LoggerFactory;
  *   push.segmentUriSuffix - Optional, segment download uri suffix, used when push.mode=uri.
  *
  */
+@TaskGenerator
 public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
+  private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE =
+      BatchConfigProperties.SegmentPushType.TAR;
 
-  private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = BatchConfigProperties.SegmentPushType.TAR;
-  private final ClusterInfoAccessor _clusterInfoAccessor;
+  private ClusterInfoAccessor _clusterInfoAccessor;
 
-  public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+  @Override
+  public void init(ClusterInfoAccessor clusterInfoAccessor) {
     _clusterInfoAccessor = clusterInfoAccessor;
   }
 
@@ -114,9 +118,7 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
       Preconditions.checkNotNull(tableTaskConfig);
       Map<String, String> taskConfigs =
           tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
-      if (tableConfigs == null) {
-        LOGGER.warn("Skip null task config for table: {}", offlineTableName);
-      }
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
 
       // Get max number of tasks for this table
       int tableMaxNumTasks;
@@ -181,7 +183,7 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
     URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
     URI outputDirURI = null;
     if (batchConfigMap.containsKey(BatchConfigProperties.OUTPUT_DIR_URI)) {
-     outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
+      outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
     }
     String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
 
@@ -191,12 +193,13 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
       URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI);
       singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
     }
-    singleFileGenerationTaskConfig
-        .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
-    singleFileGenerationTaskConfig
-        .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
+    singleFileGenerationTaskConfig.put(BatchConfigProperties.SCHEMA,
+        JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
+    singleFileGenerationTaskConfig.put(BatchConfigProperties.TABLE_CONFIGS,
+        JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
     singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(sequenceID));
-    singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+    singleFileGenerationTaskConfig
+        .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
     if ((outputDirURI == null) || (pushMode == null)) {
       singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, DEFAULT_SEGMENT_PUSH_TYPE.toString());
     } else {
@@ -276,8 +279,8 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
       throws URISyntaxException {
     URI fileURI = URI.create(uriStr);
     if (fileURI.getScheme() == null) {
-      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getAuthority(),
-          fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment());
+      return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getAuthority(), fileURI.getPath(),
+          fileURI.getQuery(), fileURI.getFragment());
     }
     return fileURI;
   }
@@ -311,5 +314,4 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
     URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
     return relativeOutputURI;
   }
-
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 21070d3..7aa7500 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -22,29 +22,59 @@ import com.clearspring.analytics.util.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Registry for all {@link PinotTaskGenerator}.
  */
 public class TaskGeneratorRegistry {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TaskGeneratorRegistry.class);
+
   private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>();
 
-  public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
-    registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
-    registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
-    registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor));
+  /**
+   * Registers the task generators via reflection.
+   * NOTE: In order to plugin a class using reflection, the class should include ".generator." in its class path. This
+   *       convention can significantly reduce the time of class scanning.
+   */
+  public TaskGeneratorRegistry(ClusterInfoAccessor clusterInfoAccessor) {
+    long startTimeMs = System.currentTimeMillis();
+    Reflections reflections = new Reflections(
+        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+            .filterInputsBy(new FilterBuilder.Include(".*\\.generator\\..*"))
+            .setScanners(new TypeAnnotationsScanner()));
+    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskGenerator.class, true);
+    for (Class<?> clazz : classes) {
+      TaskGenerator annotation = clazz.getAnnotation(TaskGenerator.class);
+      if (annotation.enabled()) {
+        try {
+          PinotTaskGenerator taskGenerator = (PinotTaskGenerator) clazz.newInstance();
+          taskGenerator.init(clusterInfoAccessor);
+          registerTaskGenerator(taskGenerator);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while initializing and registering task generator: {}, skipping it", clazz, e);
+        }
+      }
+    }
+    LOGGER.info("Initialized TaskGeneratorRegistry with {} task generators: {} in {}ms", _taskGeneratorRegistry.size(),
+        _taskGeneratorRegistry.keySet(), System.currentTimeMillis() - startTimeMs);
   }
 
   /**
    * Register a task generator.
-   *
-   * @param pinotTaskGenerator Task generator to be registered
    */
-  public void registerTaskGenerator(@Nonnull PinotTaskGenerator pinotTaskGenerator) {
+  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
     // Task type cannot contain the task name separator
     String taskType = pinotTaskGenerator.getTaskType();
     Preconditions.checkArgument(!taskType.contains(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR),
@@ -54,22 +84,17 @@ public class TaskGeneratorRegistry {
   }
 
   /**
-   * Get all registered task types.
-   *
-   * @return Set of all registered task types
+   * Returns all registered task types.
    */
-  @Nonnull
   public Set<String> getAllTaskTypes() {
     return _taskGeneratorRegistry.keySet();
   }
 
   /**
-   * Get the task generator for the given task type.
-   *
-   * @param taskType Task type
-   * @return Task generator for the given task type
+   * Returns the task generator for the given task type.
    */
-  public PinotTaskGenerator getTaskGenerator(@Nonnull String taskType) {
+  @Nullable
+  public PinotTaskGenerator getTaskGenerator(String taskType) {
     return _taskGeneratorRegistry.get(taskType);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 5aa3377..38c724d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -88,8 +88,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(metadata1));
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
 
     // Skip task generation, if offline table
     TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -141,8 +141,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(metadata1));
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
 
     // if same task and table, IN_PROGRESS, then don't generate again
     taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
@@ -177,8 +177,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
 
@@ -188,7 +188,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(seg1));
 
-    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
 
@@ -200,7 +201,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(seg1, seg2, seg3));
 
-    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
   }
@@ -227,8 +229,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
     TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
     assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -246,7 +248,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(seg1, seg2));
-    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
     assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -281,8 +284,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
     TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
     assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -296,7 +299,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     // No segments match
     when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
         .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); // 26 May 2020 UTC
-    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 0);
 
@@ -327,7 +331,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     taskConfigs.put("m1" + RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "MAX");
     taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
     realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
-    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
     assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -365,8 +370,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(metadata1, metadata2));
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
 
     // last COMPLETED segment's endTime is less than windowEnd time. CONSUMING segment overlap. Skip task
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
@@ -413,8 +418,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
         .thenReturn(Lists.newArrayList(metadata1));
 
-    RealtimeToOfflineSegmentsTaskGenerator generator =
-        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
 
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index ac0f94c..e98a875 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.integration.tests;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -37,9 +35,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
 import javax.annotation.Nullable;
-
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -54,7 +50,6 @@ import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.core.requesthandler.PinotQueryRequest;
 import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Minion;
@@ -62,8 +57,9 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.requesthandler.PinotQueryRequest;
 import org.apache.pinot.minion.MinionStarter;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
@@ -77,8 +73,9 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
@@ -164,8 +161,8 @@ public abstract class ClusterTest extends ControllerTest {
     try {
       for (int i = 0; i < numServers; i++) {
         configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
-        configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
-            Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
+        configuration
+            .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
         configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
         configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
         HelixServerStarter helixServerStarter = new HelixServerStarter(getHelixClusterName(), zkStr, configuration);
@@ -179,22 +176,21 @@ public abstract class ClusterTest extends ControllerTest {
 
   // NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
   //       to manage the instance level configs
-  protected void startMinion(@Nullable Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry,
-      @Nullable Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry) {
+  protected void startMinion(@Nullable List<PinotTaskExecutorFactory> taskExecutorFactories,
+      @Nullable List<MinionEventObserverFactory> eventObserverFactories) {
     FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
     try {
-      _minionStarter =
-          new MinionStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, new PinotConfiguration());
+      _minionStarter = new MinionStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, new PinotConfiguration());
       // Register task executor factories
-      if (taskExecutorFactoryRegistry != null) {
-        for (Map.Entry<String, PinotTaskExecutorFactory> entry : taskExecutorFactoryRegistry.entrySet()) {
-          _minionStarter.registerTaskExecutorFactory(entry.getKey(), entry.getValue());
+      if (taskExecutorFactories != null) {
+        for (PinotTaskExecutorFactory taskExecutorFactory : taskExecutorFactories) {
+          _minionStarter.registerTaskExecutorFactory(taskExecutorFactory);
         }
       }
       // Register event observer factories
-      if (eventObserverFactoryRegistry != null) {
-        for (Map.Entry<String, MinionEventObserverFactory> entry : eventObserverFactoryRegistry.entrySet()) {
-          _minionStarter.registerEventObserverFactory(entry.getKey(), entry.getValue());
+      if (eventObserverFactories != null) {
+        for (MinionEventObserverFactory eventObserverFactory : eventObserverFactories) {
+          _minionStarter.registerEventObserverFactory(eventObserverFactory);
         }
       }
       _minionStarter.start();
@@ -264,11 +260,13 @@ public abstract class ClusterTest extends ControllerTest {
       if (numSegments == 1) {
         File segmentTarFile = segmentTarFiles[0];
         if (System.currentTimeMillis() % 2 == 0) {
-          assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+          assertEquals(fileUploadDownloadClient
+                  .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
               HttpStatus.SC_OK);
         } else {
           assertEquals(
-              uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile), HttpStatus.SC_OK);
+              uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile),
+              HttpStatus.SC_OK);
         }
       } else {
         // Upload all segments in parallel
@@ -277,9 +275,12 @@ public abstract class ClusterTest extends ControllerTest {
         for (File segmentTarFile : segmentTarFiles) {
           futures.add(executorService.submit(() -> {
             if (System.currentTimeMillis() % 2 == 0) {
-              return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode();
+              return fileUploadDownloadClient
+                  .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
+                  .getStatusCode();
             } else {
-              return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile);
+              return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient,
+                  segmentTarFile);
             }
           }));
         }
@@ -295,14 +296,17 @@ public abstract class ClusterTest extends ControllerTest {
       FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
       throws IOException, HttpErrorStatusException {
     List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
-            "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder.encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
-        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+            "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder
+                .encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+            FileUploadDownloadClient.FileUploadType.METADATA.toString()));
     // Add table name as a request parameter
-    NameValuePair
-        tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+    NameValuePair tableNameValuePair =
+        new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
     List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
     return fileUploadDownloadClient
-        .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters, fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+        .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters,
+            fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
   }
 
   public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index e0a4ecc..b2c2674 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -32,10 +32,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.events.MinionEventObserver;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.minion.executor.BaseTaskExecutor;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -56,6 +57,7 @@ import static org.testng.Assert.*;
  * minion functionality.
  */
 public class SimpleMinionClusterIntegrationTest extends ClusterTest {
+  private static final String TASK_TYPE = "TestTask";
   private static final String TABLE_NAME_1 = "testTable1";
   private static final String TABLE_NAME_2 = "testTable2";
   private static final String TABLE_NAME_3 = "testTable3";
@@ -79,8 +81,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     startServer();
 
     // Add 3 offline tables, where 2 of them have TestTask enabled
-    TableTaskConfig taskConfig =
-        new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
+    TableTaskConfig taskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap()));
     addTableConfig(
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build());
     addTableConfig(
@@ -91,13 +92,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     _taskManager = _controllerStarter.getTaskManager();
 
     // Register the test task generator into task manager
-    _taskManager.registerTaskGenerator(new TestTaskGenerator(_taskManager.getClusterInfoAccessor()));
+    PinotTaskGenerator taskGenerator = new TestTaskGenerator();
+    taskGenerator.init(_taskManager.getClusterInfoAccessor());
+    _taskManager.registerTaskGenerator(taskGenerator);
 
-    Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry =
-        Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestTaskExecutorFactory());
-    Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry =
-        Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestEventObserverFactory());
-    startMinion(taskExecutorFactoryRegistry, eventObserverFactoryRegistry);
+    startMinion(Collections.singletonList(new TestTaskExecutorFactory()),
+        Collections.singletonList(new TestEventObserverFactory()));
   }
 
   @Test
@@ -106,20 +106,20 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     HOLD.set(true);
 
     // Should create the task queues and generate a task
-    assertNotNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
-        .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TestTaskGenerator.TASK_TYPE)));
+        .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
 
     // Should generate one more task
-    assertNotNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
 
     // Should not generate more tasks
-    assertNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
-    assertNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
+    assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+    assertNull(_taskManager.scheduleTask(TASK_TYPE));
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
-      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
       assertEquals(taskStates.size(), 2);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.IN_PROGRESS) {
@@ -134,11 +134,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
 
     // Stop the task queue
-    _helixTaskResourceManager.stopTaskQueue(TestTaskGenerator.TASK_TYPE);
+    _helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
 
     // Wait at most 60 seconds for all tasks STOPPED
     TestUtils.waitForCondition(input -> {
-      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
       assertEquals(taskStates.size(), 2);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.STOPPED) {
@@ -153,12 +153,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
 
     // Resume the task queue, and let the task complete
-    _helixTaskResourceManager.resumeTaskQueue(TestTaskGenerator.TASK_TYPE);
+    _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
     HOLD.set(false);
 
     // Wait at most 60 seconds for all tasks COMPLETED
     TestUtils.waitForCondition(input -> {
-      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+      Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
       assertEquals(taskStates.size(), 2);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.COMPLETED) {
@@ -173,10 +173,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
 
     // Delete the task queue
-    _helixTaskResourceManager.deleteTaskQueue(TestTaskGenerator.TASK_TYPE, false);
+    _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
 
     // Wait at most 60 seconds for task queue to be deleted
-    TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE),
+    TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE),
         STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
   }
 
@@ -194,11 +194,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
   }
 
   private static class TestTaskGenerator implements PinotTaskGenerator {
-    public static final String TASK_TYPE = "TestTask";
 
-    private final ClusterInfoAccessor _clusterInfoAccessor;
+    private ClusterInfoAccessor _clusterInfoAccessor;
 
-    public TestTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+    @Override
+    public void init(ClusterInfoAccessor clusterInfoAccessor) {
       _clusterInfoAccessor = clusterInfoAccessor;
     }
 
@@ -228,6 +228,16 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
   }
 
   public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+    @Override
+    public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+    }
+
+    @Override
+    public String getTaskType() {
+      return TASK_TYPE;
+    }
+
     @Override
     public PinotTaskExecutor create() {
       return new BaseTaskExecutor() {
@@ -237,7 +247,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
           assertNotNull(MINION_CONTEXT.getMinionMetrics());
           assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
 
-          assertEquals(pinotTaskConfig.getTaskType(), TestTaskGenerator.TASK_TYPE);
+          assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
           Map<String, String> configs = pinotTaskConfig.getConfigs();
           assertEquals(configs.size(), 2);
           String offlineTableName = configs.get("tableName");
@@ -260,6 +270,15 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
   public static class TestEventObserverFactory implements MinionEventObserverFactory {
 
     @Override
+    public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+    }
+
+    @Override
+    public String getTaskType() {
+      return TASK_TYPE;
+    }
+
+    @Override
     public MinionEventObserver create() {
       return new MinionEventObserver() {
         @Override
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 375b73a..c8d4883 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pinot.minion;
 
-import static org.apache.pinot.common.utils.CommonConstants.HTTPS_PROTOCOL;
-
+import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.IOException;
-
 import javax.net.ssl.SSLContext;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -40,8 +37,8 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
@@ -56,7 +53,7 @@ import org.apache.pinot.spi.services.ServiceStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import static org.apache.pinot.common.utils.CommonConstants.HTTPS_PROTOCOL;
 
 
 /**
@@ -78,13 +75,13 @@ public class MinionStarter implements ServiceStartable {
       throws Exception {
     _config = config;
     _instanceId = config.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
-        CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + NetUtil.getHostAddress() + "_"
-            + _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT));
+        CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + NetUtil.getHostAddress() + "_" + _config
+            .getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT));
     setupHelixSystemProperties();
     _helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
     MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
     _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
-    _eventObserverFactoryRegistry = new EventObserverFactoryRegistry();
+    _eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
   }
 
   private void setupHelixSystemProperties() {
@@ -100,16 +97,16 @@ public class MinionStarter implements ServiceStartable {
    * Registers a task executor factory.
    * <p>This is for pluggable task executor factories.
    */
-  public void registerTaskExecutorFactory(String taskType, PinotTaskExecutorFactory taskExecutorFactory) {
-    _taskExecutorFactoryRegistry.registerTaskExecutorFactory(taskType, taskExecutorFactory);
+  public void registerTaskExecutorFactory(PinotTaskExecutorFactory taskExecutorFactory) {
+    _taskExecutorFactoryRegistry.registerTaskExecutorFactory(taskExecutorFactory);
   }
 
   /**
    * Registers an event observer factory.
    * <p>This is for pluggable event observer factories.
    */
-  public void registerEventObserverFactory(String taskType, MinionEventObserverFactory eventObserverFactory) {
-    _eventObserverFactoryRegistry.registerEventObserverFactory(taskType, eventObserverFactory);
+  public void registerEventObserverFactory(MinionEventObserverFactory eventObserverFactory) {
+    _eventObserverFactoryRegistry.registerEventObserverFactory(eventObserverFactory);
   }
 
   @Override
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
similarity index 97%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
index de7e44c..600f9b4 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
 
 import javax.annotation.Nullable;
 import org.apache.pinot.core.minion.PinotTaskConfig;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
similarity index 75%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
index 900e74d..7129d4c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
@@ -16,11 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
+
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+
 
 public class DefaultMinionEventObserverFactory implements MinionEventObserverFactory {
-  private static final MinionEventObserver DEFAULT_EVENT_OBSERVER = new DefaultMinionEventObserver();
   private static final DefaultMinionEventObserverFactory INSTANCE = new DefaultMinionEventObserverFactory();
+  private static final DefaultMinionEventObserver OBSERVER_INSTANCE = new DefaultMinionEventObserver();
 
   private DefaultMinionEventObserverFactory() {
   }
@@ -30,7 +33,16 @@ public class DefaultMinionEventObserverFactory implements MinionEventObserverFac
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return null;
+  }
+
+  @Override
   public MinionEventObserver create() {
-    return DEFAULT_EVENT_OBSERVER;
+    return OBSERVER_INSTANCE;
   }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
new file mode 100644
index 0000000..fa4815b
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Registry for all {@link MinionEventObserverFactory}.
+ */
+public class EventObserverFactoryRegistry {
+  private static final Logger LOGGER = LoggerFactory.getLogger(EventObserverFactoryRegistry.class);
+
+  private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
+
+  /**
+   * Registers the event observer factories via reflection.
+   * NOTE: In order to plugin a class using reflection, the class should include ".event." in its class path. This
+   *       convention can significantly reduce the time of class scanning.
+   */
+  public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+    long startTimeMs = System.currentTimeMillis();
+    Reflections reflections = new Reflections(
+        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+            .filterInputsBy(new FilterBuilder.Include(".*\\.event\\..*")).setScanners(new TypeAnnotationsScanner()));
+    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(EventObserverFactory.class, true);
+    for (Class<?> clazz : classes) {
+      EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class);
+      if (annotation.enabled()) {
+        try {
+          MinionEventObserverFactory eventObserverFactory = (MinionEventObserverFactory) clazz.newInstance();
+          eventObserverFactory.init(zkMetadataManager);
+          registerEventObserverFactory(eventObserverFactory);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while initializing and registering event observer factory: {}, skipping it",
+              clazz, e);
+        }
+      }
+    }
+    LOGGER.info("Initialized EventObserverFactoryRegistry with {} event observer factories: {} in {}ms",
+        _eventObserverFactoryRegistry.size(), _eventObserverFactoryRegistry.keySet(),
+        System.currentTimeMillis() - startTimeMs);
+  }
+
+  /**
+   * Registers an event observer factory.
+   */
+  public void registerEventObserverFactory(MinionEventObserverFactory eventObserverFactory) {
+    _eventObserverFactoryRegistry.put(eventObserverFactory.getTaskType(), eventObserverFactory);
+  }
+
+  /**
+   * Returns the event observer factory for the given task type, or default event observer if no one is registered.
+   */
+  public MinionEventObserverFactory getEventObserverFactory(String taskType) {
+    return _eventObserverFactoryRegistry.getOrDefault(taskType, DefaultMinionEventObserverFactory.getInstance());
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
similarity index 97%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
index 6e68472..985ff8c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
 
 import javax.annotation.Nullable;
 import org.apache.pinot.core.minion.PinotTaskConfig;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
similarity index 72%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
copy to pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
index 77d5a5b..329183c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
@@ -16,7 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
+
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+
 
 /**
  * Factory for {@link MinionEventObserver}.
@@ -24,9 +27,17 @@ package org.apache.pinot.minion.events;
 public interface MinionEventObserverFactory {
 
   /**
-   * Creates an new instance of {@link MinionEventObserver}.
-   *
-   * @return Minion event observer
+   * Initializes the task executor factory.
+   */
+  void init(MinionTaskZkMetadataManager zkMetadataManager);
+
+  /**
+   * Returns the task type of the event observer.
+   */
+  String getTaskType();
+
+  /**
+   * Creates a new task event observer.
    */
   MinionEventObserver create();
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java
deleted file mode 100644
index 8a4e2d7..0000000
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.minion.events;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Registry for all {@link MinionEventObserverFactory}.
- */
-public class EventObserverFactoryRegistry {
-  private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
-
-  /**
-   * Registers an event observer factory.
-   *
-   * @param taskType Task type
-   * @param eventObserverFactory Event observer factory associated with the task type
-   */
-  public void registerEventObserverFactory(String taskType, MinionEventObserverFactory eventObserverFactory) {
-    _eventObserverFactoryRegistry.put(taskType, eventObserverFactory);
-  }
-
-  /**
-   * Returns the event observer factory for the given task type.
-   *
-   * @param taskType Task type
-   * @return Event observer factory associated with the given task type, or default event observer if no one registered
-   */
-  public MinionEventObserverFactory getEventObserverFactory(String taskType) {
-    return _eventObserverFactoryRegistry.getOrDefault(taskType, DefaultMinionEventObserverFactory.getInstance());
-  }
-}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
index 37d5cd0..5ac4535 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
@@ -18,9 +18,23 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
 public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFactory {
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
+  }
+
+  @Override
   public PinotTaskExecutor create() {
     return new ConvertToRawIndexTaskExecutor();
   }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
index 66d2c86..89bee79 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
@@ -18,7 +18,22 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
 public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.MergeRollupTask.TASK_TYPE;
+  }
+
   @Override
   public PinotTaskExecutor create() {
     return new MergeRollupTaskExecutor();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
index 9f35af7..a51044b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
@@ -24,9 +24,17 @@ package org.apache.pinot.minion.executor;
 public interface PinotTaskExecutorFactory {
 
   /**
-   * Creates an new instance of {@link PinotTaskExecutor}.
-   *
-   * @return Pinot task executor
+   * Initializes the task executor factory.
+   */
+  void init(MinionTaskZkMetadataManager zkMetadataManager);
+
+  /**
+   * Returns the task type of the executor.
+   */
+  String getTaskType();
+
+  /**
+   * Creates a new task executor.
    */
   PinotTaskExecutor create();
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
index 4bd885b..0851c59 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
@@ -18,9 +18,23 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
 public class PurgeTaskExecutorFactory implements PinotTaskExecutorFactory {
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.PurgeTask.TASK_TYPE;
+  }
+
+  @Override
   public PinotTaskExecutor create() {
     return new PurgeTaskExecutor();
   }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index 7eabbc4..7be3876 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -18,19 +18,29 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
 /**
  * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
  */
+@TaskExecutorFactory
 public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+  private MinionTaskZkMetadataManager _zkMetadataManager;
 
-  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+    _zkMetadataManager = zkMetadataManager;
+  }
 
-  public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
-    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+  @Override
+  public String getTaskType() {
+    return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
   }
 
   @Override
   public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
+    return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager);
   }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
index ec18879..00b9e78 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -18,7 +18,22 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
 public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+  }
+
   @Override
   public PinotTaskExecutor create() {
     return new SegmentGenerationAndPushTaskExecutor();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index a86a39b..fbc3aab 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -21,41 +21,62 @@ package org.apache.pinot.minion.executor;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
-import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Registry for all {@link PinotTaskExecutorFactory}.
  */
 public class TaskExecutorFactoryRegistry {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorFactoryRegistry.class);
+
   private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>();
 
-  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
-    registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
-        new ConvertToRawIndexTaskExecutorFactory());
-    registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
-    registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
-    registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
-    registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory());
+  /**
+   * Registers the task executor factories via reflection.
+   * NOTE: In order to plugin a class using reflection, the class should include ".executor." in its class path. This
+   *       convention can significantly reduce the time of class scanning.
+   */
+  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+    long startTimeMs = System.currentTimeMillis();
+    Reflections reflections = new Reflections(
+        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+            .filterInputsBy(new FilterBuilder.Include(".*\\.executor\\..*")).setScanners(new TypeAnnotationsScanner()));
+    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskExecutorFactory.class, true);
+    for (Class<?> clazz : classes) {
+      TaskExecutorFactory annotation = clazz.getAnnotation(TaskExecutorFactory.class);
+      if (annotation.enabled()) {
+        try {
+          PinotTaskExecutorFactory taskExecutorFactory = (PinotTaskExecutorFactory) clazz.newInstance();
+          taskExecutorFactory.init(zkMetadataManager);
+          registerTaskExecutorFactory(taskExecutorFactory);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while initializing and registering task executor factory: {}, skipping it",
+              clazz, e);
+        }
+      }
+    }
+    LOGGER.info("Initialized TaskExecutorFactoryRegistry with {} task executor factories: {} in {}ms",
+        _taskExecutorFactoryRegistry.size(), _taskExecutorFactoryRegistry.keySet(),
+        System.currentTimeMillis() - startTimeMs);
   }
 
   /**
    * Registers a task executor factory.
-   *
-   * @param taskType Task type
-   * @param taskExecutorFactory Task executor factory associated with the task type
    */
-  public void registerTaskExecutorFactory(@Nonnull String taskType,
-      @Nonnull PinotTaskExecutorFactory taskExecutorFactory) {
-    _taskExecutorFactoryRegistry.put(taskType, taskExecutorFactory);
+  public void registerTaskExecutorFactory(PinotTaskExecutorFactory taskExecutorFactory) {
+    _taskExecutorFactoryRegistry.put(taskExecutorFactory.getTaskType(), taskExecutorFactory);
   }
 
   /**
    * Returns all registered task types.
-   *
-   * @return Set of all registered task types
    */
   public Set<String> getAllTaskTypes() {
     return _taskExecutorFactoryRegistry.keySet();
@@ -63,11 +84,8 @@ public class TaskExecutorFactoryRegistry {
 
   /**
    * Returns the task executor factory for the given task type.
-   *
-   * @param taskType Task type
-   * @return Task executor factory associated with the given task type
    */
-  public PinotTaskExecutorFactory getTaskExecutorFactory(@Nonnull String taskType) {
+  public PinotTaskExecutorFactory getTaskExecutorFactory(String taskType) {
     return _taskExecutorFactoryRegistry.get(taskType);
   }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index a4a9552..ce5c840 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -21,16 +21,15 @@ package org.apache.pinot.minion.taskfactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.MinionContext;
-import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
-import org.apache.pinot.minion.events.MinionEventObserver;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
 import org.apache.pinot.minion.exception.FatalException;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -52,12 +51,11 @@ public class TaskFactoryRegistry {
 
   private final Map<String, TaskFactory> _taskFactoryRegistry = new HashMap<>();
 
-  public TaskFactoryRegistry(@Nonnull TaskExecutorFactoryRegistry taskExecutorFactoryRegistry,
-      @Nonnull EventObserverFactoryRegistry eventObserverFactoryRegistry) {
-    for (final String taskType : taskExecutorFactoryRegistry.getAllTaskTypes()) {
-      final PinotTaskExecutorFactory taskExecutorFactory = taskExecutorFactoryRegistry.getTaskExecutorFactory(taskType);
-      final MinionEventObserverFactory eventObserverFactory =
-          eventObserverFactoryRegistry.getEventObserverFactory(taskType);
+  public TaskFactoryRegistry(TaskExecutorFactoryRegistry taskExecutorFactoryRegistry,
+      EventObserverFactoryRegistry eventObserverFactoryRegistry) {
+    for (String taskType : taskExecutorFactoryRegistry.getAllTaskTypes()) {
+      PinotTaskExecutorFactory taskExecutorFactory = taskExecutorFactoryRegistry.getTaskExecutorFactory(taskType);
+      MinionEventObserverFactory eventObserverFactory = eventObserverFactoryRegistry.getEventObserverFactory(taskType);
 
       LOGGER.info("Registering {} with task executor factory: {}, event observer factory: {}", taskType,
           taskExecutorFactory.getClass().getSimpleName(), eventObserverFactory.getClass().getSimpleName());
@@ -121,9 +119,7 @@ public class TaskFactoryRegistry {
   }
 
   /**
-   * Get the task factory registry.
-   *
-   * @return Task factory registry
+   * Returns the task factory registry.
    */
   public Map<String, TaskFactory> getTaskFactoryRegistry() {
     return _taskFactoryRegistry;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
similarity index 57%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
index 7eabbc4..2aa8d68 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
@@ -16,21 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.executor;
+package org.apache.pinot.spi.annotations.minion;
 
-/**
- * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
- */
-public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
 
-  public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
-    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
-  }
+/**
+ * Annotation for Minion event observer factories.
+ *
+ * NOTE:
+ *   - The annotated class must implement the MinionEventObserverFactory interface
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.event.*' to be auto-registered.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface EventObserverFactory {
 
-  @Override
-  public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
-  }
+  boolean enabled() default true;
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
similarity index 57%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
index 7eabbc4..38166b0 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
@@ -16,21 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.executor;
+package org.apache.pinot.spi.annotations.minion;
 
-/**
- * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
- */
-public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
 
-  public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
-    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
-  }
+/**
+ * Annotation for Minion task executor factories.
+ *
+ * NOTE:
+ *   - The annotated class must implement the PinotTaskExecutorFactory interface
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.executor.*' to be auto-registered.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskExecutorFactory {
 
-  @Override
-  public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
-  }
+  boolean enabled() default true;
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
similarity index 58%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
index 77d5a5b..5b5946a 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
@@ -16,17 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.minion.events;
+package org.apache.pinot.spi.annotations.minion;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
 
 /**
- * Factory for {@link MinionEventObserver}.
+ * Annotation for Minion task generators.
+ *
+ * NOTE:
+ *   - The annotated class must implement the PinotTaskGenerator interface
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.generator.*' to be auto-registered.
  */
-public interface MinionEventObserverFactory {
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskGenerator {
 
-  /**
-   * Creates an new instance of {@link MinionEventObserver}.
-   *
-   * @return Minion event observer
-   */
-  MinionEventObserver create();
+  boolean enabled() default true;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org