You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/21 23:21:05 UTC
[incubator-pinot] branch master updated: Adding cluster config to
config number of concurrent tasks per instance for minion task:
SegmentGenerationAndPushTaskGenerator (#6468)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f17be35 Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468)
f17be35 is described below
commit f17be35c348686c8041f0c3e2aef87f62e2cc97a
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Jan 21 15:20:46 2021 -0800
Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468)
* Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance
* Switch to real controller test
* Address comments
---
.../helix/ControllerRequestURLBuilder.java | 4 ++
.../helix/core/minion/ClusterInfoAccessor.java | 16 +++++
.../SegmentGenerationAndPushTaskGenerator.java | 16 +++++
.../SegmentGenerationAndPushTaskGeneratorTest.java | 80 ++++++++++++++++++++++
.../apache/pinot/core/common/MinionConstants.java | 2 +
5 files changed, 118 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index c77ae3b..f938e0b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -332,4 +332,8 @@ public class ControllerRequestURLBuilder {
.collect(Collectors.joining(",", "{", "}"));
return forIngestFromURI(tableNameWithType, batchConfigMapStr, sourceURIStr);
}
+
+ public String forClusterConfigs() {
+ return StringUtil.join("/", _baseUrl, "cluster/configs");
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 8d3db71..7b1ff36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -157,4 +160,17 @@ public class ClusterInfoAccessor {
public String getVipUrl() {
return _controllerConf.generateVipUrl();
}
+
+ /**
+ * Get the cluster config for a given config name, return null if not found.
+ *
+ * @return cluster config
+ */
+ public String getClusterConfig(String configName) {
+ HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+ Map<String, String> configMap =
+ _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName));
+ return configMap != null ? configMap.get(configName) : null;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 41b7740..3ea3d31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
@@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
}
@Override
+ public int getNumConcurrentTasksPerInstance() {
+ String numConcurrentTasksPerInstanceStr = _clusterInfoAccessor
+ .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
+ if (numConcurrentTasksPerInstanceStr != null) {
+ try {
+ return Integer.parseInt(numConcurrentTasksPerInstanceStr);
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse cluster config: {}",
+ MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e);
+ }
+ }
+ return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ }
+
+ @Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
new file mode 100644
index 0000000..5a81ef1
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
+ */
+public class SegmentGenerationAndPushTaskGeneratorTest extends ControllerTest {
+ SegmentGenerationAndPushTaskGenerator _generator;
+
+ @BeforeClass
+ public void setup() {
+ int zkPort = 2171;
+ startZk(zkPort);
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+ properties.put(ControllerConf.ZK_STR, "localhost:" + zkPort);
+ properties.put(ControllerConf.HELIX_CLUSTER_NAME, SegmentGenerationAndPushTaskGeneratorTest.class.getSimpleName());
+ properties.put(ControllerConf.CONTROLLER_PORT, 28998);
+ startController(properties);
+
+ ClusterInfoAccessor clusterInfoAccessor = _controllerStarter.getTaskManager().getClusterInfoAccessor();
+ _generator = new SegmentGenerationAndPushTaskGenerator();
+ _generator.init(clusterInfoAccessor);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopController();
+ stopZk();
+ }
+
+ @Test
+ public void testRealCluster()
+ throws Exception {
+ // Default is 1
+ Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1);
+
+ // Set config to 5
+ String request = JsonUtils.objectToString(Collections
+ .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, "5"));
+ sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request);
+ Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 5);
+
+ // Set config to invalid and should still get 1
+ request = JsonUtils.objectToString(Collections
+ .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE,
+ "abcd"));
+ sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request);
+ Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 546a9fb..d9d1a9d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,8 @@ public class MinionConstants {
// Generate segment and push to controller based on batch ingestion configs
public static class SegmentGenerationAndPushTask {
public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+ public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
+ "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org