You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/27 04:53:22 UTC
[incubator-pinot] branch master updated: Unify the minion plug-in
package regex path (#6980)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5dcb7 Unify the minion plug-in package regex path (#6980)
0b5dcb7 is described below
commit 0b5dcb7aed1b85ab26345ab6c1076ccb88358f4a
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed May 26 21:53:00 2021 -0700
Unify the minion plug-in package regex path (#6980)
## Description
Unify minion pluggable class (`PinotTaskGenerator`, `PinotTaskExecutorFactory`, `MinionEventObserverFactory`) package regex path to `org.apache.pinot.*.plugin.minion.tasks.*`.
Modify `SimpleMinionClusterIntegrationTest` to use the pluggable classes.
## Release Notes
Regex path for pluggable `MinionEventObserverFactory` is changed from `org.apache.pinot.*.event.*` to `org.apache.pinot.*.plugin.minion.tasks.*`
---
.../minion/generator/TaskGeneratorRegistry.java | 2 +-
.../tests/BasicAuthBatchIntegrationTest.java | 28 ++--
.../tests/BasicAuthRealtimeIntegrationTest.java | 5 +-
.../pinot/integration/tests/ClusterTest.java | 24 +--
...vertToRawIndexMinionClusterIntegrationTest.java | 2 +-
...fflineSegmentsMinionClusterIntegrationTest.java | 2 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 161 ++-------------------
.../minion/tasks/TestEventObserverFactory.java | 73 ++++++++++
.../minion/tasks/TestTaskExecutorFactory.java | 81 +++++++++++
.../plugin/minion/tasks/TestTaskGenerator.java | 72 +++++++++
.../minion/event/EventObserverFactoryRegistry.java | 9 +-
.../executor/TaskExecutorFactoryRegistry.java | 6 +-
.../annotations/minion/EventObserverFactory.java | 2 +-
.../annotations/minion/TaskExecutorFactory.java | 2 +-
.../spi/annotations/minion/TaskGenerator.java | 2 +-
15 files changed, 276 insertions(+), 195 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 95dee4b..083996d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -40,7 +40,7 @@ public class TaskGeneratorRegistry {
private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>();
/**
- * The package regex pattern for auto-registered {@link TaskGenerator}.
+ * The package regex pattern for auto-registered {@link PinotTaskGenerator}.
*/
public static final String TASK_GENERATOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
index 1bb6825..b5d0bb7 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -63,7 +62,7 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
startController();
startBroker();
startServer();
- startMinion(Collections.emptyList(), Collections.emptyList());
+ startMinion();
}
@AfterClass(alwaysRun = true)
@@ -99,8 +98,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
@Test
public void testBrokerNoAuth()
throws Exception {
- JsonNode response =
- JsonUtils.stringToJsonNode(sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}"));
+ JsonNode response = JsonUtils.stringToJsonNode(
+ sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}"));
Assert.assertFalse(response.has("resultTable"), "must not return result table");
Assert.assertTrue(response.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code");
}
@@ -109,7 +108,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
public void testBroker()
throws Exception {
JsonNode response = JsonUtils.stringToJsonNode(
- sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}", AUTH_HEADER));
+ sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}",
+ AUTH_HEADER));
Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG",
"must return result with LONG value");
Assert.assertTrue(response.get("exceptions").isEmpty(), "must not return exception");
@@ -118,7 +118,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
@Test
public void testControllerGetTables()
throws Exception {
- JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER));
+ JsonNode response =
+ JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER));
Assert.assertTrue(response.get("tables").isArray(), "must return table array");
}
@@ -155,18 +156,17 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
// patch ingestion job file
String jobFileContents = IOUtils.toString(new FileInputStream(jobFile));
- IOUtils.write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())),
- new FileOutputStream(jobFile));
+ IOUtils
+ .write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())), new FileOutputStream(jobFile));
- new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN)
- .execute();
+ new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN).execute();
Thread.sleep(5000);
// admin with full access
JsonNode response = JsonUtils.stringToJsonNode(
- sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}",
- AUTH_HEADER));
+ sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql",
+ "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER));
Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG",
"must return result with LONG value");
Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "count(*)",
@@ -177,8 +177,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest {
// user with valid auth but no table access
JsonNode responseUser = JsonUtils.stringToJsonNode(
- sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}",
- AUTH_HEADER_USER));
+ sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql",
+ "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER_USER));
Assert.assertFalse(responseUser.has("resultTable"), "must not return result table");
Assert.assertTrue(responseUser.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code");
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
index 05c2264..4a9105e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java
@@ -61,7 +61,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest
startController();
startBroker();
startServer();
- startMinion(null, null);
+ startMinion();
// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
@@ -141,8 +141,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest
@Override
protected Connection getPinotConnection() {
if (_pinotConnection == null) {
- _pinotConnection =
- ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER);
+ _pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER);
}
return _pinotConnection;
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index fdb1d5c..83cd38b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import javax.annotation.Nullable;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -53,8 +52,6 @@ import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.minion.MinionStarter;
-import org.apache.pinot.minion.event.MinionEventObserverFactory;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig;
@@ -161,8 +158,7 @@ public abstract class ClusterTest extends ControllerTest {
}
protected void startServer(PinotConfiguration configuration) {
- startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT,
- getZkUrl());
+ startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
}
protected void startServers(int numServers) {
@@ -206,26 +202,13 @@ public abstract class ClusterTest extends ControllerTest {
// NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext
// to manage the instance level configs
- protected void startMinion(@Nullable List<PinotTaskExecutorFactory> taskExecutorFactories,
- @Nullable List<MinionEventObserverFactory> eventObserverFactories) {
+ protected void startMinion() {
FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR));
try {
PinotConfiguration minionConf = getDefaultMinionConfiguration();
minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
_minionStarter = new MinionStarter(minionConf);
- // Register task executor factories
- if (taskExecutorFactories != null) {
- for (PinotTaskExecutorFactory taskExecutorFactory : taskExecutorFactories) {
- _minionStarter.registerTaskExecutorFactory(taskExecutorFactory);
- }
- }
- // Register event observer factories
- if (eventObserverFactories != null) {
- for (MinionEventObserverFactory eventObserverFactory : eventObserverFactories) {
- _minionStarter.registerEventObserverFactory(eventObserverFactory);
- }
- }
_minionStarter.start();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -420,7 +403,8 @@ public abstract class ClusterTest extends ControllerTest {
/**
* Queries the broker's pql query endpoint (/query)
*/
- public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType, Map<String, String> headers)
+ public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType,
+ Map<String, String> headers)
throws Exception {
ObjectNode payload = JsonUtils.newObjectNode();
payload.put(queryType, query);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 1d96c71..83312b2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -81,7 +81,7 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
// The parent setUp() sets up Zookeeper, Kafka, controller, broker and servers
super.setUp();
- startMinion(null, null);
+ startMinion();
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 91ca0aa..6a8f654 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -71,7 +71,7 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt
// Setup realtime table, and blank offline table
super.setUp();
addTableConfig(createOfflineTableConfig());
- startMinion(null, null);
+ startMinion();
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index c753611..17e984c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -18,34 +18,19 @@
*/
package org.apache.pinot.integration.tests;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
-import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.minion.event.MinionEventObserver;
-import org.apache.pinot.minion.event.MinionEventObserverFactory;
-import org.apache.pinot.minion.exception.TaskCancelledException;
-import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
-import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
-import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -59,20 +44,21 @@ import static org.testng.Assert.*;
* minion functionality.
*/
public class SimpleMinionClusterIntegrationTest extends ClusterTest {
- private static final String TASK_TYPE = "TestTask";
- private static final String TABLE_NAME_1 = "testTable1";
- private static final String TABLE_NAME_2 = "testTable2";
- private static final String TABLE_NAME_3 = "testTable3";
+ // Accessed by the plug-in classes
+ public static final String TASK_TYPE = "TestTask";
+ public static final String TABLE_NAME_1 = "testTable1";
+ public static final String TABLE_NAME_2 = "testTable2";
+ public static final String TABLE_NAME_3 = "testTable3";
+ public static final int NUM_TASKS = 2;
+ public static final int NUM_CONFIGS = 3;
+ public static final AtomicBoolean HOLD = new AtomicBoolean();
+ public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
+ public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
+ public static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
+ public static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
+
private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L; // 1 minute
private static final long ZK_CALLBACK_TIMEOUT_MS = 30_000L; // 30 seconds
- private static final int NUM_TASKS = 2;
- private static final int NUM_CONFIGS = 3;
-
- private static final AtomicBoolean HOLD = new AtomicBoolean();
- private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
- private static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
- private static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
- private static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotTaskManager _taskManager;
@@ -96,13 +82,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
- // Register the test task generator into task manager
- PinotTaskGenerator taskGenerator = new TestTaskGenerator();
- taskGenerator.init(_taskManager.getClusterInfoAccessor());
- _taskManager.registerTaskGenerator(taskGenerator);
-
- startMinion(Collections.singletonList(new TestTaskExecutorFactory()),
- Collections.singletonList(new TestEventObserverFactory()));
+ startMinion();
}
@Test
@@ -229,117 +209,4 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
stopController();
stopZk();
}
-
- private static class TestTaskGenerator implements PinotTaskGenerator {
-
- private ClusterInfoAccessor _clusterInfoAccessor;
-
- @Override
- public void init(ClusterInfoAccessor clusterInfoAccessor) {
- _clusterInfoAccessor = clusterInfoAccessor;
- }
-
- @Override
- public String getTaskType() {
- return TASK_TYPE;
- }
-
- @Override
- public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
- assertEquals(tableConfigs.size(), NUM_TASKS);
-
- // Generate at most 2 tasks
- if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) {
- return Collections.emptyList();
- }
-
- List<PinotTaskConfig> taskConfigs = new ArrayList<>();
- for (TableConfig tableConfig : tableConfigs) {
- Map<String, String> configs = new HashMap<>();
- configs.put("tableName", tableConfig.getTableName());
- configs.put("tableType", tableConfig.getTableType().toString());
- taskConfigs.add(new PinotTaskConfig(TASK_TYPE, configs));
- }
- return taskConfigs;
- }
- }
-
- public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
-
- @Override
- public void init(MinionTaskZkMetadataManager zkMetadataManager) {
- }
-
- @Override
- public String getTaskType() {
- return TASK_TYPE;
- }
-
- @Override
- public PinotTaskExecutor create() {
- return new BaseTaskExecutor() {
- @Override
- public Boolean executeTask(PinotTaskConfig pinotTaskConfig) {
- assertTrue(MINION_CONTEXT.getDataDir().exists());
- assertNotNull(MINION_CONTEXT.getMinionMetrics());
- assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
-
- assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
- Map<String, String> configs = pinotTaskConfig.getConfigs();
- assertEquals(configs.size(), NUM_CONFIGS);
- String offlineTableName = configs.get("tableName");
- assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE);
- String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
- assertTrue(rawTableName.equals(TABLE_NAME_1) || rawTableName.equals(TABLE_NAME_2));
- assertEquals(configs.get("tableType"), TableType.OFFLINE.toString());
-
- do {
- if (_cancelled) {
- throw new TaskCancelledException("Task has been cancelled");
- }
- } while (HOLD.get());
- return true;
- }
- };
- }
- }
-
- public static class TestEventObserverFactory implements MinionEventObserverFactory {
-
- @Override
- public void init(MinionTaskZkMetadataManager zkMetadataManager) {
- }
-
- @Override
- public String getTaskType() {
- return TASK_TYPE;
- }
-
- @Override
- public MinionEventObserver create() {
- return new MinionEventObserver() {
- @Override
- public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
- TASK_START_NOTIFIED.set(true);
- }
-
- @Override
- public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
- assertTrue(executionResult instanceof Boolean);
- assertTrue((Boolean) executionResult);
- TASK_SUCCESS_NOTIFIED.set(true);
- }
-
- @Override
- public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
- TASK_CANCELLED_NOTIFIED.set(true);
- }
-
- @Override
- public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
- TASK_ERROR_NOTIFIED.set(true);
- }
- };
- }
- }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java
new file mode 100644
index 0000000..91ff5f1
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObserverFactory;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Event observer factory for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@EventObserverFactory
+public class TestEventObserverFactory implements MinionEventObserverFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+ }
+
+ @Override
+ public MinionEventObserver create() {
+ return new MinionEventObserver() {
+ @Override
+ public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+ SimpleMinionClusterIntegrationTest.TASK_START_NOTIFIED.set(true);
+ }
+
+ @Override
+ public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) {
+ assertTrue(executionResult instanceof Boolean);
+ assertTrue((Boolean) executionResult);
+ SimpleMinionClusterIntegrationTest.TASK_SUCCESS_NOTIFIED.set(true);
+ }
+
+ @Override
+ public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
+ SimpleMinionClusterIntegrationTest.TASK_CANCELLED_NOTIFIED.set(true);
+ }
+
+ @Override
+ public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) {
+ SimpleMinionClusterIntegrationTest.TASK_ERROR_NOTIFIED.set(true);
+ }
+ };
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
new file mode 100644
index 0000000..d14509a
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import java.util.Map;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.exception.TaskCancelledException;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Task executor factory for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@TaskExecutorFactory
+public class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+ }
+
+ @Override
+ public PinotTaskExecutor create() {
+ return new BaseTaskExecutor() {
+ @Override
+ public Boolean executeTask(PinotTaskConfig pinotTaskConfig) {
+ assertTrue(MINION_CONTEXT.getDataDir().exists());
+ assertNotNull(MINION_CONTEXT.getMinionMetrics());
+ assertNotNull(MINION_CONTEXT.getHelixPropertyStore());
+
+ assertEquals(pinotTaskConfig.getTaskType(), SimpleMinionClusterIntegrationTest.TASK_TYPE);
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ assertEquals(configs.size(), SimpleMinionClusterIntegrationTest.NUM_CONFIGS);
+ String offlineTableName = configs.get("tableName");
+ assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE);
+ String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
+ assertTrue(rawTableName.equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_1) || rawTableName
+ .equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_2));
+ assertEquals(configs.get("tableType"), TableType.OFFLINE.toString());
+
+ do {
+ if (_cancelled) {
+ throw new TaskCancelledException("Task has been cancelled");
+ }
+ } while (SimpleMinionClusterIntegrationTest.HOLD.get());
+ return true;
+ }
+ };
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
new file mode 100644
index 0000000..740d3e5
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.plugin.minion.tasks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Task generator for {@link SimpleMinionClusterIntegrationTest}.
+ */
+@TaskGenerator
+public class TestTaskGenerator implements PinotTaskGenerator {
+ private ClusterInfoAccessor _clusterInfoAccessor;
+
+ @Override
+ public void init(ClusterInfoAccessor clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
+ }
+
+ @Override
+ public String getTaskType() {
+ return SimpleMinionClusterIntegrationTest.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ assertEquals(tableConfigs.size(), SimpleMinionClusterIntegrationTest.NUM_TASKS);
+
+ // Generate at most 2 tasks
+ if (_clusterInfoAccessor.getTaskStates(SimpleMinionClusterIntegrationTest.TASK_TYPE).size()
+ >= SimpleMinionClusterIntegrationTest.NUM_TASKS) {
+ return Collections.emptyList();
+ }
+
+ List<PinotTaskConfig> taskConfigs = new ArrayList<>();
+ for (TableConfig tableConfig : tableConfigs) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tableName", tableConfig.getTableName());
+ configs.put("tableType", tableConfig.getTableType().toString());
+ taskConfigs.add(new PinotTaskConfig(SimpleMinionClusterIntegrationTest.TASK_TYPE, configs));
+ }
+ return taskConfigs;
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
index 716bdd8..d220f8d 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
@@ -34,6 +34,11 @@ import org.slf4j.LoggerFactory;
public class EventObserverFactoryRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(EventObserverFactoryRegistry.class);
+ /**
+ * The package regex pattern for auto-registered {@link MinionEventObserverFactory}.
+ */
+ private static final String EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
+
private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>();
/**
@@ -43,8 +48,8 @@ public class EventObserverFactoryRegistry {
*/
public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
long startTimeMs = System.currentTimeMillis();
- Set<Class<?>> classes =
- PinotReflectionUtils.getClassesThroughReflection(".*\\.event\\..*", EventObserverFactory.class);
+ Set<Class<?>> classes = PinotReflectionUtils
+ .getClassesThroughReflection(EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN, EventObserverFactory.class);
for (Class<?> clazz : classes) {
EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class);
if (annotation.enabled()) {
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 6b5f2c2..3ba1c40 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -34,9 +34,9 @@ public class TaskExecutorFactoryRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorFactoryRegistry.class);
/**
- * The package regex pattern for auto-registered {@link TaskExecutorFactory}.
+ * The package regex pattern for auto-registered {@link PinotTaskExecutorFactory}.
*/
- private static final String TASK_EXECUTOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
+ private static final String TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*";
private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>();
@@ -68,7 +68,7 @@ public class TaskExecutorFactoryRegistry {
public static Set<Class<?>> getTaskExecutorFactoryClasses() {
return PinotReflectionUtils
- .getClassesThroughReflection(TASK_EXECUTOR_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class);
+ .getClassesThroughReflection(TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class);
}
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
index 2aa8d68..df85a32 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
*
* NOTE:
* - The annotated class must implement the MinionEventObserverFactory interface
- * - The annotated class must be under the package of name 'org.apache.pinot.*.event.*' to be auto-registered.
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
index 38166b0..a4c6c4d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
*
* NOTE:
* - The annotated class must implement the PinotTaskExecutorFactory interface
- * - The annotated class must be under the package of name 'org.apache.pinot.*.executor.*' to be auto-registered.
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
index 33c7511..c615708 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java
@@ -29,7 +29,7 @@ import java.lang.annotation.Target;
*
* NOTE:
* - The annotated class must implement the PinotTaskGenerator interface
- * - The annotated class must be under the package of name 'org.apache.pinot.plugin.minion.tasks.*' to be auto-registered.
+ * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org