You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/14 08:10:06 UTC

[pinot] branch master updated: create task queue as needed for adhoc task (#8540)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe7247fb96 create task queue as needed for adhoc task (#8540)
fe7247fb96 is described below

commit fe7247fb96c6627eb44f30d6199ffb6fdf5f835f
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Apr 14 01:09:59 2022 -0700

    create task queue as needed for adhoc task (#8540)
    
    Co-authored-by: Xiaobing Li <xi...@startree.ai>
---
 .../helix/core/minion/PinotTaskManager.java        |   5 +-
 ...mentGenerationMinionClusterIntegrationTest.java | 139 +++++++++++++++++++++
 2 files changed, 141 insertions(+), 3 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 308ba5ca63..522cc5e077 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -144,6 +144,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     }
     String minionInstanceTag =
         taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
     String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
     TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
     if (taskState != null) {
@@ -175,9 +177,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
       throw new UnknownTaskTypeException(
           "Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
     }
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-
     // responseMap holds the table to task name mapping.
     Map<String, String> responseMap = new HashMap<>();
     for (String tableNameWithType : tableNameWithTypes) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..374bb2d617
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SegmentGenerationMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class);
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startMinion();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    try {
+      stopMinion();
+      stopServer();
+      stopBroker();
+      stopController();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+
+  @Test
+  public void testAdhocSegmentGenerationAndPushTask()
+      throws Exception {
+    String tableName = "myTable";
+    String tableNameWithType = tableName + "_OFFLINE";
+    addSchemaAndTableConfig(tableName);
+
+    File inputDir = new File(_tempDir, tableName);
+    int rowCnt = prepInputFiles(inputDir, 7, 10);
+    assertEquals(rowCnt, 70);
+
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.getAbsolutePath());
+    taskConfigs.put(BatchConfigProperties.INPUT_FORMAT, "csv");
+    AdhocTaskConfig adhocTaskConfig =
+        new AdhocTaskConfig("SegmentGenerationAndPushTask", tableNameWithType, null, taskConfigs);
+
+    String url = _controllerBaseApiUrl + "/tasks/execute";
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        if (getTotalDocs(tableName) < rowCnt) {
+          // To avoid the NoTaskScheduledException after all files are ingested.
+          sendPostRequest(url, JsonUtils.objectToString(adhocTaskConfig),
+              Collections.singletonMap("accept", "application/json"));
+        }
+        return getTotalDocs(tableName) == rowCnt;
+      } catch (Exception e) {
+        LOGGER.error("Failed to get expected totalDocs: " + rowCnt, e);
+        return false;
+      }
+    }, 5000L, 600_000L, "Failed to load " + rowCnt + " documents", true);
+    JsonNode result = postQuery("SELECT COUNT(*) FROM " + tableName, _brokerBaseApiUrl);
+    // One segment per file.
+    assertEquals(result.get("numSegmentsQueried").asInt(), 7);
+  }
+
+  private void addSchemaAndTableConfig(String tableName)
+      throws Exception {
+    addSchema(new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("id", FieldSpec.DataType.INT)
+        .addSingleValueDimension("name", FieldSpec.DataType.STRING).build());
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build();
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toString(),
+        BasicAuthTestUtils.AUTH_HEADER);
+  }
+
+  private int prepInputFiles(File inputDir, int fileNum, int rowsPerFile)
+      throws Exception {
+    int rowCnt = 0;
+    for (int i = 0; i < fileNum; i++) {
+      File csvFile = new File(inputDir, String.format("tempFile_%05d.csv", i));
+      FileUtils.write(csvFile, "id,name\n", false);
+      for (int j = 0; j < rowsPerFile; j++) {
+        FileUtils.write(csvFile, String.format("%d,n%d\n", rowCnt, rowCnt), true);
+        rowCnt++;
+      }
+    }
+    return rowCnt;
+  }
+
+  private int getTotalDocs(String tableName)
+      throws Exception {
+    String query = "SELECT COUNT(*) FROM " + tableName;
+    JsonNode response = postQuery(query, _brokerBaseApiUrl);
+    JsonNode resTbl = response.get("resultTable");
+    return (resTbl == null) ? 0 : resTbl.get("rows").get(0).get(0).asInt();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org