You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/14 08:10:06 UTC
[pinot] branch master updated: create task queue as needed for adhoc task (#8540)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe7247fb96 create task queue as needed for adhoc task (#8540)
fe7247fb96 is described below
commit fe7247fb96c6627eb44f30d6199ffb6fdf5f835f
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Apr 14 01:09:59 2022 -0700
create task queue as needed for adhoc task (#8540)
Co-authored-by: Xiaobing Li <xi...@startree.ai>
---
.../helix/core/minion/PinotTaskManager.java | 5 +-
...mentGenerationMinionClusterIntegrationTest.java | 139 +++++++++++++++++++++
2 files changed, 141 insertions(+), 3 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 308ba5ca63..522cc5e077 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -144,6 +144,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
}
String minionInstanceTag =
taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ addTaskTypeMetricsUpdaterIfNeeded(taskType);
String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
if (taskState != null) {
@@ -175,9 +177,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
throw new UnknownTaskTypeException(
"Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
}
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
-
// responseMap holds the table to task name mapping.
Map<String, String> responseMap = new HashMap<>();
for (String tableNameWithType : tableNameWithTypes) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..374bb2d617
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SegmentGenerationMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class);
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+ startMinion();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ try {
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ } finally {
+ FileUtils.deleteQuietly(_tempDir);
+ }
+ }
+
+ @Test
+ public void testAdhocSegmentGenerationAndPushTask()
+ throws Exception {
+ String tableName = "myTable";
+ String tableNameWithType = tableName + "_OFFLINE";
+ addSchemaAndTableConfig(tableName);
+
+ File inputDir = new File(_tempDir, tableName);
+ int rowCnt = prepInputFiles(inputDir, 7, 10);
+ assertEquals(rowCnt, 70);
+
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.getAbsolutePath());
+ taskConfigs.put(BatchConfigProperties.INPUT_FORMAT, "csv");
+ AdhocTaskConfig adhocTaskConfig =
+ new AdhocTaskConfig("SegmentGenerationAndPushTask", tableNameWithType, null, taskConfigs);
+
+ String url = _controllerBaseApiUrl + "/tasks/execute";
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ if (getTotalDocs(tableName) < rowCnt) {
+ // To avoid the NoTaskScheduledException after all files are ingested.
+ sendPostRequest(url, JsonUtils.objectToString(adhocTaskConfig),
+ Collections.singletonMap("accept", "application/json"));
+ }
+ return getTotalDocs(tableName) == rowCnt;
+ } catch (Exception e) {
+ LOGGER.error("Failed to get expected totalDocs: " + rowCnt, e);
+ return false;
+ }
+ }, 5000L, 600_000L, "Failed to load " + rowCnt + " documents", true);
+ JsonNode result = postQuery("SELECT COUNT(*) FROM " + tableName, _brokerBaseApiUrl);
+ // One segment per file.
+ assertEquals(result.get("numSegmentsQueried").asInt(), 7);
+ }
+
+ private void addSchemaAndTableConfig(String tableName)
+ throws Exception {
+ addSchema(new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("id", FieldSpec.DataType.INT)
+ .addSingleValueDimension("name", FieldSpec.DataType.STRING).build());
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build();
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toString(),
+ BasicAuthTestUtils.AUTH_HEADER);
+ }
+
+ private int prepInputFiles(File inputDir, int fileNum, int rowsPerFile)
+ throws Exception {
+ int rowCnt = 0;
+ for (int i = 0; i < fileNum; i++) {
+ File csvFile = new File(inputDir, String.format("tempFile_%05d.csv", i));
+ FileUtils.write(csvFile, "id,name\n", false);
+ for (int j = 0; j < rowsPerFile; j++) {
+ FileUtils.write(csvFile, String.format("%d,n%d\n", rowCnt, rowCnt), true);
+ rowCnt++;
+ }
+ }
+ return rowCnt;
+ }
+
+ private int getTotalDocs(String tableName)
+ throws Exception {
+ String query = "SELECT COUNT(*) FROM " + tableName;
+ JsonNode response = postQuery(query, _brokerBaseApiUrl);
+ JsonNode resTbl = response.get("resultTable");
+ return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org