You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/01/05 20:36:02 UTC
[incubator-pinot] branch master updated: Make minion tasks
pluggable via reflection (#6395)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8def748 Make minion tasks pluggable via reflection (#6395)
8def748 is described below
commit 8def7483b0f0d0cd09c5536f5792626d2d1f3c52
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jan 5 12:35:41 2021 -0800
Make minion tasks pluggable via reflection (#6395)
Introduces 3 annotations to plugin minion components:
- TaskGenerator: For task generators to be plugged on controller side
- TaskExecutorFactory: For task executor factories to be plugged on minion side
- EventObserverFactory: For event observer factories to be plugged on minion side
Annotated classes will automatically be plugged into the controller and minion.
---
.../generator/ConvertToRawIndexTaskGenerator.java | 7 +-
.../core/minion/generator/PinotTaskGenerator.java | 14 ++--
.../RealtimeToOfflineSegmentsTaskGenerator.java | 10 ++-
.../SegmentGenerationAndPushTaskGenerator.java | 32 ++++----
.../minion/generator/TaskGeneratorRegistry.java | 59 ++++++++++-----
...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 43 ++++++-----
.../pinot/integration/tests/ClusterTest.java | 66 +++++++++--------
.../tests/SimpleMinionClusterIntegrationTest.java | 71 +++++++++++-------
.../org/apache/pinot/minion/MinionStarter.java | 25 +++----
.../DefaultMinionEventObserver.java | 2 +-
.../DefaultMinionEventObserverFactory.java | 18 ++++-
.../minion/event/EventObserverFactoryRegistry.java | 85 ++++++++++++++++++++++
.../{events => event}/MinionEventObserver.java | 2 +-
.../MinionEventObserverFactory.java | 19 ++++-
.../events/EventObserverFactoryRegistry.java | 50 -------------
.../ConvertToRawIndexTaskExecutorFactory.java | 14 ++++
.../executor/MergeRollupTaskExecutorFactory.java | 15 ++++
.../minion/executor/PinotTaskExecutorFactory.java | 14 +++-
.../minion/executor/PurgeTaskExecutorFactory.java | 14 ++++
...altimeToOfflineSegmentsTaskExecutorFactory.java | 18 ++++-
...egmentGenerationAndPushTaskExecutorFactory.java | 15 ++++
.../executor/TaskExecutorFactoryRegistry.java | 62 ++++++++++------
.../minion/taskfactory/TaskFactoryRegistry.java | 22 +++---
.../annotations/minion/EventObserverFactory.java | 29 ++++----
.../annotations/minion/TaskExecutorFactory.java | 29 ++++----
.../spi/annotations/minion/TaskGenerator.java | 25 ++++---
26 files changed, 489 insertions(+), 271 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index 437ac93..ed7e6b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -36,12 +37,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@TaskGenerator
public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
- private final ClusterInfoAccessor _clusterInfoAccessor;
+ private ClusterInfoAccessor _clusterInfoAccessor;
- public ConvertToRawIndexTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 6fe55ee..85eed0e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.minion.generator;
import java.util.List;
import org.apache.helix.task.JobConfig;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -30,23 +31,22 @@ import org.apache.pinot.spi.config.table.TableConfig;
public interface PinotTaskGenerator {
/**
+ * Initializes the task generator.
+ */
+ void init(ClusterInfoAccessor clusterInfoAccessor);
+
+ /**
* Returns the task type of the generator.
- *
- * @return Task type of the generator
*/
String getTaskType();
/**
* Generates a list of tasks to schedule based on the given table configs.
- *
- * @return List of tasks to schedule
*/
List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
/**
* Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
- *
- * @return Timeout in milliseconds for each task.
*/
default long getTaskTimeoutMs() {
return JobConfig.DEFAULT_TIMEOUT_PER_TASK;
@@ -54,8 +54,6 @@ public interface PinotTaskGenerator {
/**
* Returns the maximum number of concurrent tasks allowed per instance, 1 by default.
- *
- * @return Maximum number of concurrent tasks allowed per instance
*/
default int getNumConcurrentTasksPerInstance() {
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index a0e519b..a278396 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -72,15 +73,17 @@ import org.slf4j.LoggerFactory;
*
* - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
*/
+@TaskGenerator
public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
private static final String DEFAULT_BUCKET_PERIOD = "1d";
private static final String DEFAULT_BUFFER_PERIOD = "2d";
- private final ClusterInfoAccessor _clusterInfoAccessor;
+ private ClusterInfoAccessor _clusterInfoAccessor;
- public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
}
@@ -101,7 +104,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, realtimeTableName);
continue;
}
- StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ StreamConfig streamConfig =
+ new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(tableConfig));
if (streamConfig.hasHighLevelConsumerType()) {
LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", taskType, realtimeTableName);
continue;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index c3b3f43..7921e09 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -83,13 +84,16 @@ import org.slf4j.LoggerFactory;
* push.segmentUriSuffix - Optional, segment download uri suffix, used when push.mode=uri.
*
*/
+@TaskGenerator
public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
+ private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE =
+ BatchConfigProperties.SegmentPushType.TAR;
- private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = BatchConfigProperties.SegmentPushType.TAR;
- private final ClusterInfoAccessor _clusterInfoAccessor;
+ private ClusterInfoAccessor _clusterInfoAccessor;
- public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
}
@@ -114,9 +118,7 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
Preconditions.checkNotNull(tableTaskConfig);
Map<String, String> taskConfigs =
tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
- if (tableConfigs == null) {
- LOGGER.warn("Skip null task config for table: {}", offlineTableName);
- }
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
// Get max number of tasks for this table
int tableMaxNumTasks;
@@ -181,7 +183,7 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
URI outputDirURI = null;
if (batchConfigMap.containsKey(BatchConfigProperties.OUTPUT_DIR_URI)) {
- outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
+ outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
}
String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
@@ -191,12 +193,13 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI);
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
}
- singleFileGenerationTaskConfig
- .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
- singleFileGenerationTaskConfig
- .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.SCHEMA,
+ JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.TABLE_CONFIGS,
+ JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(sequenceID));
- singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+ singleFileGenerationTaskConfig
+ .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
if ((outputDirURI == null) || (pushMode == null)) {
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, DEFAULT_SEGMENT_PUSH_TYPE.toString());
} else {
@@ -276,8 +279,8 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
throws URISyntaxException {
URI fileURI = URI.create(uriStr);
if (fileURI.getScheme() == null) {
- return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getAuthority(),
- fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment());
+ return new URI(fullUriForPathOnlyUriStr.getScheme(), fullUriForPathOnlyUriStr.getAuthority(), fileURI.getPath(),
+ fileURI.getQuery(), fileURI.getFragment());
}
return fileURI;
}
@@ -311,5 +314,4 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
return relativeOutputURI;
}
-
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 21070d3..7aa7500 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -22,29 +22,59 @@ import com.clearspring.analytics.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Registry for all {@link PinotTaskGenerator}.
*/
public class TaskGeneratorRegistry {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskGeneratorRegistry.class);
+
private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>();
- public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
- registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
- registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
- registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor));
+ /**
+ * Registers the task generators via reflection.
+ * NOTE: In order to plugin a class using reflection, the class should include ".generator." in its class path. This
+ * convention can significantly reduce the time of class scanning.
+ */
+ public TaskGeneratorRegistry(ClusterInfoAccessor clusterInfoAccessor) {
+ long startTimeMs = System.currentTimeMillis();
+ Reflections reflections = new Reflections(
+ new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+ .filterInputsBy(new FilterBuilder.Include(".*\\.generator\\..*"))
+ .setScanners(new TypeAnnotationsScanner()));
+ Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskGenerator.class, true);
+ for (Class<?> clazz : classes) {
+ TaskGenerator annotation = clazz.getAnnotation(TaskGenerator.class);
+ if (annotation.enabled()) {
+ try {
+ PinotTaskGenerator taskGenerator = (PinotTaskGenerator) clazz.newInstance();
+ taskGenerator.init(clusterInfoAccessor);
+ registerTaskGenerator(taskGenerator);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while initializing and registering task generator: {}, skipping it", clazz, e);
+ }
+ }
+ }
+ LOGGER.info("Initialized TaskGeneratorRegistry with {} task generators: {} in {}ms", _taskGeneratorRegistry.size(),
+ _taskGeneratorRegistry.keySet(), System.currentTimeMillis() - startTimeMs);
}
/**
* Register a task generator.
- *
- * @param pinotTaskGenerator Task generator to be registered
*/
- public void registerTaskGenerator(@Nonnull PinotTaskGenerator pinotTaskGenerator) {
+ public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
// Task type cannot contain the task name separator
String taskType = pinotTaskGenerator.getTaskType();
Preconditions.checkArgument(!taskType.contains(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR),
@@ -54,22 +84,17 @@ public class TaskGeneratorRegistry {
}
/**
- * Get all registered task types.
- *
- * @return Set of all registered task types
+ * Returns all registered task types.
*/
- @Nonnull
public Set<String> getAllTaskTypes() {
return _taskGeneratorRegistry.keySet();
}
/**
- * Get the task generator for the given task type.
- *
- * @param taskType Task type
- * @return Task generator for the given task type
+ * Returns the task generator for the given task type.
*/
- public PinotTaskGenerator getTaskGenerator(@Nonnull String taskType) {
+ @Nullable
+ public PinotTaskGenerator getTaskGenerator(String taskType) {
return _taskGeneratorRegistry.get(taskType);
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 5aa3377..38c724d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -88,8 +88,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(metadata1));
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
// Skip task generation, if offline table
TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -141,8 +141,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(metadata1));
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
// if same task and table, IN_PROGRESS, then don't generate again
taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
@@ -177,8 +177,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
@@ -188,7 +188,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(seg1));
- generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
@@ -200,7 +201,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(seg1, seg2, seg3));
- generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
}
@@ -227,8 +229,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -246,7 +248,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
"download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(seg1, seg2));
- generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -281,8 +284,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -296,7 +299,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
// No segments match
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
.thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); // 26 May 2020 UTC
- generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 0);
@@ -327,7 +331,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
taskConfigs.put("m1" + RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "MAX");
taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
- generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertEquals(pinotTaskConfigs.size(), 1);
assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
@@ -365,8 +370,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(metadata1, metadata2));
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
// last COMPLETED segment's endTime is less than windowEnd time. CONSUMING segment overlap. Skip task
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
@@ -413,8 +418,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
.thenReturn(Lists.newArrayList(metadata1));
- RealtimeToOfflineSegmentsTaskGenerator generator =
- new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index ac0f94c..e98a875 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.integration.tests;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -37,9 +35,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import javax.annotation.Nullable;
-
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -54,7 +50,6 @@ import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.core.requesthandler.PinotQueryRequest;
import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Minion;
@@ -62,8 +57,9 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.requesthandler.PinotQueryRequest;
import org.apache.pinot.minion.MinionStarter;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
@@ -77,8 +73,9 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
/**
@@ -164,8 +161,8 @@ public abstract class ClusterTest extends ControllerTest {
try {
for (int i = 0; i < numServers; i++) {
configuration.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, Server.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
- configuration.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
- Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
+ configuration
+ .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
HelixServerStarter helixServerStarter = new HelixServerStarter(getHelixClusterName(), zkStr, configuration);
@@ -179,22 +176,21 @@ public abstract class ClusterTest extends ControllerTest {
// NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
// to manage the instance level configs
- protected void startMinion(@Nullable Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry,
- @Nullable Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry) {
+ protected void startMinion(@Nullable List<PinotTaskExecutorFactory> taskExecutorFactories,
+ @Nullable List<MinionEventObserverFactory> eventObserverFactories) {
FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
try {
- _minionStarter =
- new MinionStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, new PinotConfiguration());
+ _minionStarter = new MinionStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, new PinotConfiguration());
// Register task executor factories
- if (taskExecutorFactoryRegistry != null) {
- for (Map.Entry<String, PinotTaskExecutorFactory> entry : taskExecutorFactoryRegistry.entrySet()) {
- _minionStarter.registerTaskExecutorFactory(entry.getKey(), entry.getValue());
+ if (taskExecutorFactories != null) {
+ for (PinotTaskExecutorFactory taskExecutorFactory : taskExecutorFactories) {
+ _minionStarter.registerTaskExecutorFactory(taskExecutorFactory);
}
}
// Register event observer factories
- if (eventObserverFactoryRegistry != null) {
- for (Map.Entry<String, MinionEventObserverFactory> entry : eventObserverFactoryRegistry.entrySet()) {
- _minionStarter.registerEventObserverFactory(entry.getKey(), entry.getValue());
+ if (eventObserverFactories != null) {
+ for (MinionEventObserverFactory eventObserverFactory : eventObserverFactories) {
+ _minionStarter.registerEventObserverFactory(eventObserverFactory);
}
}
_minionStarter.start();
@@ -264,11 +260,13 @@ public abstract class ClusterTest extends ControllerTest {
if (numSegments == 1) {
File segmentTarFile = segmentTarFiles[0];
if (System.currentTimeMillis() % 2 == 0) {
- assertEquals(fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
+ assertEquals(fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode(),
HttpStatus.SC_OK);
} else {
assertEquals(
- uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile), HttpStatus.SC_OK);
+ uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile),
+ HttpStatus.SC_OK);
}
} else {
// Upload all segments in parallel
@@ -277,9 +275,12 @@ public abstract class ClusterTest extends ControllerTest {
for (File segmentTarFile : segmentTarFiles) {
futures.add(executorService.submit(() -> {
if (System.currentTimeMillis() % 2 == 0) {
- return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName).getStatusCode();
+ return fileUploadDownloadClient
+ .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName)
+ .getStatusCode();
} else {
- return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile);
+ return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile);
}
}));
}
@@ -295,14 +296,17 @@ public abstract class ClusterTest extends ControllerTest {
FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
throws IOException, HttpErrorStatusException {
List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
- "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder.encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
- new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder
+ .encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())),
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+ FileUploadDownloadClient.FileUploadType.METADATA.toString()));
// Add table name as a request parameter
- NameValuePair
- tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
+ NameValuePair tableNameValuePair =
+ new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName);
List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
return fileUploadDownloadClient
- .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters, fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ .uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters,
+ fileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
}
public static class AvroFileSchemaKafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index e0a4ecc..b2c2674 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -32,10 +32,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.events.MinionEventObserver;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.minion.executor.BaseTaskExecutor;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -56,6 +57,7 @@ import static org.testng.Assert.*;
* minion functionality.
*/
public class SimpleMinionClusterIntegrationTest extends ClusterTest {
+ private static final String TASK_TYPE = "TestTask";
private static final String TABLE_NAME_1 = "testTable1";
private static final String TABLE_NAME_2 = "testTable2";
private static final String TABLE_NAME_3 = "testTable3";
@@ -79,8 +81,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
startServer();
// Add 3 offline tables, where 2 of them have TestTask enabled
- TableTaskConfig taskConfig =
- new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
+ TableTaskConfig taskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap()));
addTableConfig(
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(taskConfig).build());
addTableConfig(
@@ -91,13 +92,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
_taskManager = _controllerStarter.getTaskManager();
// Register the test task generator into task manager
- _taskManager.registerTaskGenerator(new TestTaskGenerator(_taskManager.getClusterInfoAccessor()));
+ PinotTaskGenerator taskGenerator = new TestTaskGenerator();
+ taskGenerator.init(_taskManager.getClusterInfoAccessor());
+ _taskManager.registerTaskGenerator(taskGenerator);
- Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry =
- Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestTaskExecutorFactory());
- Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry =
- Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestEventObserverFactory());
- startMinion(taskExecutorFactoryRegistry, eventObserverFactoryRegistry);
+ startMinion(Collections.singletonList(new TestTaskExecutorFactory()),
+ Collections.singletonList(new TestEventObserverFactory()));
}
@Test
@@ -106,20 +106,20 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
HOLD.set(true);
// Should create the task queues and generate a task
- assertNotNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
- .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TestTaskGenerator.TASK_TYPE)));
+ .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
// Should generate one more task
- assertNotNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
// Should not generate more tasks
- assertNull(_taskManager.scheduleTasks().get(TestTaskGenerator.TASK_TYPE));
- assertNull(_taskManager.scheduleTask(TestTaskGenerator.TASK_TYPE));
+ assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+ assertNull(_taskManager.scheduleTask(TASK_TYPE));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
- Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+ Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), 2);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.IN_PROGRESS) {
@@ -134,11 +134,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
// Stop the task queue
- _helixTaskResourceManager.stopTaskQueue(TestTaskGenerator.TASK_TYPE);
+ _helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
// Wait at most 60 seconds for all tasks STOPPED
TestUtils.waitForCondition(input -> {
- Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+ Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), 2);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.STOPPED) {
@@ -153,12 +153,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
// Resume the task queue, and let the task complete
- _helixTaskResourceManager.resumeTaskQueue(TestTaskGenerator.TASK_TYPE);
+ _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);
// Wait at most 60 seconds for all tasks COMPLETED
TestUtils.waitForCondition(input -> {
- Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TestTaskGenerator.TASK_TYPE).values();
+ Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
assertEquals(taskStates.size(), 2);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.COMPLETED) {
@@ -173,10 +173,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
// Delete the task queue
- _helixTaskResourceManager.deleteTaskQueue(TestTaskGenerator.TASK_TYPE, false);
+ _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
// Wait at most 60 seconds for task queue to be deleted
- TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TestTaskGenerator.TASK_TYPE),
+ TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE),
STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
}
@@ -194,11 +194,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
}
private static class TestTaskGenerator implements PinotTaskGenerator {
- public static final String TASK_TYPE = "TestTask";
- private final ClusterInfoAccessor _clusterInfoAccessor;
+ private ClusterInfoAccessor _clusterInfoAccessor;
- public TestTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
}
@@ -228,6 +228,16 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
}
public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return TASK_TYPE;
+ }
+
@Override
public PinotTaskExecutor create() {
return new BaseTaskExecutor() {
@@ -237,7 +247,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
assertNotNull(MINION_CONTEXT.getMinionMetrics());
assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
- assertEquals(pinotTaskConfig.getTaskType(), TestTaskGenerator.TASK_TYPE);
+ assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
Map<String, String> configs = pinotTaskConfig.getConfigs();
assertEquals(configs.size(), 2);
String offlineTableName = configs.get("tableName");
@@ -260,6 +270,15 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
public static class TestEventObserverFactory implements MinionEventObserverFactory {
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return TASK_TYPE;
+ }
+
+ @Override
public MinionEventObserver create() {
return new MinionEventObserver() {
@Override
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 375b73a..c8d4883 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -18,13 +18,10 @@
*/
package org.apache.pinot.minion;
-import static org.apache.pinot.common.utils.CommonConstants.HTTPS_PROTOCOL;
-
+import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
import java.io.IOException;
-
import javax.net.ssl.SSLContext;
-
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
@@ -40,8 +37,8 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
@@ -56,7 +53,7 @@ import org.apache.pinot.spi.services.ServiceStartable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.yammer.metrics.core.MetricsRegistry;
+import static org.apache.pinot.common.utils.CommonConstants.HTTPS_PROTOCOL;
/**
@@ -78,13 +75,13 @@ public class MinionStarter implements ServiceStartable {
throws Exception {
_config = config;
_instanceId = config.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
- CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + NetUtil.getHostAddress() + "_"
- + _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT));
+ CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + NetUtil.getHostAddress() + "_" + _config
+ .getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT));
setupHelixSystemProperties();
_helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
_taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
- _eventObserverFactoryRegistry = new EventObserverFactoryRegistry();
+ _eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
}
private void setupHelixSystemProperties() {
@@ -100,16 +97,16 @@ public class MinionStarter implements ServiceStartable {
* Registers a task executor factory.
* <p>This is for pluggable task executor factories.
*/
- public void registerTaskExecutorFactory(String taskType, PinotTaskExecutorFactory taskExecutorFactory) {
- _taskExecutorFactoryRegistry.registerTaskExecutorFactory(taskType, taskExecutorFactory);
+ public void registerTaskExecutorFactory(PinotTaskExecutorFactory taskExecutorFactory) {
+ _taskExecutorFactoryRegistry.registerTaskExecutorFactory(taskExecutorFactory);
}
/**
* Registers an event observer factory.
* <p>This is for pluggable event observer factories.
*/
- public void registerEventObserverFactory(String taskType, MinionEventObserverFactory eventObserverFactory) {
- _eventObserverFactoryRegistry.registerEventObserverFactory(taskType, eventObserverFactory);
+ public void registerEventObserverFactory(MinionEventObserverFactory eventObserverFactory) {
+ _eventObserverFactoryRegistry.registerEventObserverFactory(eventObserverFactory);
}
@Override
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
similarity index 97%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
index de7e44c..600f9b4 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserver.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
import javax.annotation.Nullable;
import org.apache.pinot.core.minion.PinotTaskConfig;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
similarity index 75%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
index 900e74d..7129d4c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/DefaultMinionEventObserverFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/DefaultMinionEventObserverFactory.java
@@ -16,11 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
+
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+
public class DefaultMinionEventObserverFactory implements MinionEventObserverFactory {
- private static final MinionEventObserver DEFAULT_EVENT_OBSERVER = new DefaultMinionEventObserver();
private static final DefaultMinionEventObserverFactory INSTANCE = new DefaultMinionEventObserverFactory();
+ private static final DefaultMinionEventObserver OBSERVER_INSTANCE = new DefaultMinionEventObserver();
private DefaultMinionEventObserverFactory() {
}
@@ -30,7 +33,16 @@ public class DefaultMinionEventObserverFactory implements MinionEventObserverFac
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return null;
+ }
+
+ @Override
public MinionEventObserver create() {
- return DEFAULT_EVENT_OBSERVER;
+ return OBSERVER_INSTANCE;
}
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
new file mode 100644
index 0000000..fa4815b
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Registry for all {@link MinionEventObserverFactory}.
+ */
+public class EventObserverFactoryRegistry {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventObserverFactoryRegistry.class);
+
+ private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
+
+ /**
+ * Registers the event observer factories via reflection.
+ * NOTE: In order to plugin a class using reflection, the class should include ".event." in its class path. This
+ * convention can significantly reduce the time of class scanning.
+ */
+ public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+ long startTimeMs = System.currentTimeMillis();
+ Reflections reflections = new Reflections(
+ new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+ .filterInputsBy(new FilterBuilder.Include(".*\\.event\\..*")).setScanners(new TypeAnnotationsScanner()));
+ Set<Class<?>> classes = reflections.getTypesAnnotatedWith(EventObserverFactory.class, true);
+ for (Class<?> clazz : classes) {
+ EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class);
+ if (annotation.enabled()) {
+ try {
+ MinionEventObserverFactory eventObserverFactory = (MinionEventObserverFactory) clazz.newInstance();
+ eventObserverFactory.init(zkMetadataManager);
+ registerEventObserverFactory(eventObserverFactory);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while initializing and registering event observer factory: {}, skipping it",
+ clazz, e);
+ }
+ }
+ }
+ LOGGER.info("Initialized EventObserverFactoryRegistry with {} event observer factories: {} in {}ms",
+ _eventObserverFactoryRegistry.size(), _eventObserverFactoryRegistry.keySet(),
+ System.currentTimeMillis() - startTimeMs);
+ }
+
+ /**
+ * Registers an event observer factory.
+ */
+ public void registerEventObserverFactory(MinionEventObserverFactory eventObserverFactory) {
+ _eventObserverFactoryRegistry.put(eventObserverFactory.getTaskType(), eventObserverFactory);
+ }
+
+ /**
+ * Returns the event observer factory for the given task type, or default event observer if no one is registered.
+ */
+ public MinionEventObserverFactory getEventObserverFactory(String taskType) {
+ return _eventObserverFactoryRegistry.getOrDefault(taskType, DefaultMinionEventObserverFactory.getInstance());
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
similarity index 97%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java
rename to pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
index 6e68472..985ff8c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
import javax.annotation.Nullable;
import org.apache.pinot.core.minion.PinotTaskConfig;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
similarity index 72%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
copy to pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
index 77d5a5b..329183c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserverFactory.java
@@ -16,7 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.events;
+package org.apache.pinot.minion.event;
+
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+
/**
* Factory for {@link MinionEventObserver}.
@@ -24,9 +27,17 @@ package org.apache.pinot.minion.events;
public interface MinionEventObserverFactory {
/**
- * Creates an new instance of {@link MinionEventObserver}.
- *
- * @return Minion event observer
+ * Initializes the task executor factory.
+ */
+ void init(MinionTaskZkMetadataManager zkMetadataManager);
+
+ /**
+ * Returns the task type of the event observer.
+ */
+ String getTaskType();
+
+ /**
+ * Creates a new task event observer.
*/
MinionEventObserver create();
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java
deleted file mode 100644
index 8a4e2d7..0000000
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/EventObserverFactoryRegistry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.minion.events;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Registry for all {@link MinionEventObserverFactory}.
- */
-public class EventObserverFactoryRegistry {
- private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
-
- /**
- * Registers an event observer factory.
- *
- * @param taskType Task type
- * @param eventObserverFactory Event observer factory associated with the task type
- */
- public void registerEventObserverFactory(String taskType, MinionEventObserverFactory eventObserverFactory) {
- _eventObserverFactoryRegistry.put(taskType, eventObserverFactory);
- }
-
- /**
- * Returns the event observer factory for the given task type.
- *
- * @param taskType Task type
- * @return Event observer factory associated with the given task type, or default event observer if no one registered
- */
- public MinionEventObserverFactory getEventObserverFactory(String taskType) {
- return _eventObserverFactoryRegistry.getOrDefault(taskType, DefaultMinionEventObserverFactory.getInstance());
- }
-}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
index 37d5cd0..5ac4535 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/ConvertToRawIndexTaskExecutorFactory.java
@@ -18,9 +18,23 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFactory {
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
+ }
+
+ @Override
public PinotTaskExecutor create() {
return new ConvertToRawIndexTaskExecutor();
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
index 66d2c86..89bee79 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
@@ -18,7 +18,22 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.MergeRollupTask.TASK_TYPE;
+ }
+
@Override
public PinotTaskExecutor create() {
return new MergeRollupTaskExecutor();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
index 9f35af7..a51044b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
@@ -24,9 +24,17 @@ package org.apache.pinot.minion.executor;
public interface PinotTaskExecutorFactory {
/**
- * Creates an new instance of {@link PinotTaskExecutor}.
- *
- * @return Pinot task executor
+ * Initializes the task executor factory.
+ */
+ void init(MinionTaskZkMetadataManager zkMetadataManager);
+
+ /**
+ * Returns the task type of the executor.
+ */
+ String getTaskType();
+
+ /**
+ * Creates a new task executor.
*/
PinotTaskExecutor create();
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
index 4bd885b..0851c59 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PurgeTaskExecutorFactory.java
@@ -18,9 +18,23 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
public class PurgeTaskExecutorFactory implements PinotTaskExecutorFactory {
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.PurgeTask.TASK_TYPE;
+ }
+
+ @Override
public PinotTaskExecutor create() {
return new PurgeTaskExecutor();
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index 7eabbc4..7be3876 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -18,19 +18,29 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
/**
* Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
*/
+@TaskExecutorFactory
public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+ private MinionTaskZkMetadataManager _zkMetadataManager;
- private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ _zkMetadataManager = zkMetadataManager;
+ }
- public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
- _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+ @Override
+ public String getTaskType() {
+ return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
}
@Override
public PinotTaskExecutor create() {
- return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
+ return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager);
}
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
index ec18879..00b9e78 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -18,7 +18,22 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+ }
+
@Override
public PinotTaskExecutor create() {
return new SegmentGenerationAndPushTaskExecutor();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index a86a39b..fbc3aab 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -21,41 +21,62 @@ package org.apache.pinot.minion.executor;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
-import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Registry for all {@link PinotTaskExecutorFactory}.
*/
public class TaskExecutorFactoryRegistry {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorFactoryRegistry.class);
+
private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>();
- public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
- registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
- new ConvertToRawIndexTaskExecutorFactory());
- registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
- registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
- registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
- new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
- registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory());
+ /**
+ * Registers the task executor factories via reflection.
+ * NOTE: In order to plugin a class using reflection, the class should include ".executor." in its class path. This
+ * convention can significantly reduce the time of class scanning.
+ */
+ public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+ long startTimeMs = System.currentTimeMillis();
+ Reflections reflections = new Reflections(
+ new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
+ .filterInputsBy(new FilterBuilder.Include(".*\\.executor\\..*")).setScanners(new TypeAnnotationsScanner()));
+ Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskExecutorFactory.class, true);
+ for (Class<?> clazz : classes) {
+ TaskExecutorFactory annotation = clazz.getAnnotation(TaskExecutorFactory.class);
+ if (annotation.enabled()) {
+ try {
+ PinotTaskExecutorFactory taskExecutorFactory = (PinotTaskExecutorFactory) clazz.newInstance();
+ taskExecutorFactory.init(zkMetadataManager);
+ registerTaskExecutorFactory(taskExecutorFactory);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while initializing and registering task executor factory: {}, skipping it",
+ clazz, e);
+ }
+ }
+ }
+ LOGGER.info("Initialized TaskExecutorFactoryRegistry with {} task executor factories: {} in {}ms",
+ _taskExecutorFactoryRegistry.size(), _taskExecutorFactoryRegistry.keySet(),
+ System.currentTimeMillis() - startTimeMs);
}
/**
* Registers a task executor factory.
- *
- * @param taskType Task type
- * @param taskExecutorFactory Task executor factory associated with the task type
*/
- public void registerTaskExecutorFactory(@Nonnull String taskType,
- @Nonnull PinotTaskExecutorFactory taskExecutorFactory) {
- _taskExecutorFactoryRegistry.put(taskType, taskExecutorFactory);
+ public void registerTaskExecutorFactory(PinotTaskExecutorFactory taskExecutorFactory) {
+ _taskExecutorFactoryRegistry.put(taskExecutorFactory.getTaskType(), taskExecutorFactory);
}
/**
* Returns all registered task types.
- *
- * @return Set of all registered task types
*/
public Set<String> getAllTaskTypes() {
return _taskExecutorFactoryRegistry.keySet();
@@ -63,11 +84,8 @@ public class TaskExecutorFactoryRegistry {
/**
* Returns the task executor factory for the given task type.
- *
- * @param taskType Task type
- * @return Task executor factory associated with the given task type
*/
- public PinotTaskExecutorFactory getTaskExecutorFactory(@Nonnull String taskType) {
+ public PinotTaskExecutorFactory getTaskExecutorFactory(String taskType) {
return _taskExecutorFactoryRegistry.get(taskType);
}
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index a4a9552..ce5c840 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -21,16 +21,15 @@ package org.apache.pinot.minion.taskfactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionContext;
-import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
-import org.apache.pinot.minion.events.MinionEventObserver;
-import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.event.EventObserverFactoryRegistry;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
import org.apache.pinot.minion.exception.FatalException;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -52,12 +51,11 @@ public class TaskFactoryRegistry {
private final Map<String, TaskFactory> _taskFactoryRegistry = new HashMap<>();
- public TaskFactoryRegistry(@Nonnull TaskExecutorFactoryRegistry taskExecutorFactoryRegistry,
- @Nonnull EventObserverFactoryRegistry eventObserverFactoryRegistry) {
- for (final String taskType : taskExecutorFactoryRegistry.getAllTaskTypes()) {
- final PinotTaskExecutorFactory taskExecutorFactory = taskExecutorFactoryRegistry.getTaskExecutorFactory(taskType);
- final MinionEventObserverFactory eventObserverFactory =
- eventObserverFactoryRegistry.getEventObserverFactory(taskType);
+ public TaskFactoryRegistry(TaskExecutorFactoryRegistry taskExecutorFactoryRegistry,
+ EventObserverFactoryRegistry eventObserverFactoryRegistry) {
+ for (String taskType : taskExecutorFactoryRegistry.getAllTaskTypes()) {
+ PinotTaskExecutorFactory taskExecutorFactory = taskExecutorFactoryRegistry.getTaskExecutorFactory(taskType);
+ MinionEventObserverFactory eventObserverFactory = eventObserverFactoryRegistry.getEventObserverFactory(taskType);
LOGGER.info("Registering {} with task executor factory: {}, event observer factory: {}", taskType,
taskExecutorFactory.getClass().getSimpleName(), eventObserverFactory.getClass().getSimpleName());
@@ -121,9 +119,7 @@ public class TaskFactoryRegistry {
}
/**
- * Get the task factory registry.
- *
- * @return Task factory registry
+ * Returns the task factory registry.
*/
public Map<String, TaskFactory> getTaskFactoryRegistry() {
return _taskFactoryRegistry;
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
similarity index 57%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
index 7eabbc4..2aa8d68 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
@@ -16,21 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.executor;
+package org.apache.pinot.spi.annotations.minion;
-/**
- * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
- */
-public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
- private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
- public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
- _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
- }
+/**
+ * Annotation for Minion event observer factories.
+ *
+ * NOTE:
+ * - The annotated class must implement the MinionEventObserverFactory interface
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.event.*' to be auto-registered.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface EventObserverFactory {
- @Override
- public PinotTaskExecutor create() {
- return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
- }
+ boolean enabled() default true;
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
similarity index 57%
copy from pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
index 7eabbc4..38166b0 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
@@ -16,21 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.executor;
+package org.apache.pinot.spi.annotations.minion;
-/**
- * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
- */
-public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
- private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
- public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
- _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
- }
+/**
+ * Annotation for Minion task executor factories.
+ *
+ * NOTE:
+ * - The annotated class must implement the PinotTaskExecutorFactory interface
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.executor.*' to be auto-registered.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskExecutorFactory {
- @Override
- public PinotTaskExecutor create() {
- return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
- }
+ boolean enabled() default true;
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
similarity index 58%
rename from pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
index 77d5a5b..5b5946a 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/events/MinionEventObserverFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
@@ -16,17 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.minion.events;
+package org.apache.pinot.spi.annotations.minion;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
/**
- * Factory for {@link MinionEventObserver}.
+ * Annotation for Minion task generators.
+ *
+ * NOTE:
+ * - The annotated class must implement the PinotTaskGenerator interface
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.generator.*' to be auto-registered.
*/
-public interface MinionEventObserverFactory {
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TaskGenerator {
- /**
- * Creates an new instance of {@link MinionEventObserver}.
- *
- * @return Minion event observer
- */
- MinionEventObserver create();
+ boolean enabled() default true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org