You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/20 20:07:22 UTC
[incubator-pinot] 01/01: Adding cluster config to config minion
task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per
instance
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch adding_cluster_config_for_minion_SegmentGenerationAndPushTaskGenerator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 23d0ebdfa35bda874fc886fbe6cccd2d0ab4bfa3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jan 20 12:06:47 2021 -0800
Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance
---
.../helix/core/minion/ClusterInfoAccessor.java | 19 ++++++++
.../SegmentGenerationAndPushTaskGenerator.java | 16 +++++++
.../SegmentGenerationAndPushTaskGeneratorTest.java | 53 ++++++++++++++++++++++
.../apache/pinot/core/common/MinionConstants.java | 2 +
4 files changed, 90 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 8d3db71..7d4aad8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -157,4 +160,20 @@ public class ClusterInfoAccessor {
public String getVipUrl() {
return _controllerConf.generateVipUrl();
}
+
+ /**
+ * Get the cluster config for a given cluster config.
+ *
+ * @return cluster config
+ */
+ public String getClusterConfig(String configName) {
+ HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+ Map<String, String> configMap =
+ _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Arrays.asList(configName));
+ if (configMap == null || !configMap.containsKey(configName)) {
+ return null;
+ }
+ return configMap.get(configName);
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 41b7740..face245 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
@@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
}
@Override
+ public int getNumConcurrentTasksPerInstance() {
+ String numConcurrentTasksPerInstance = _clusterInfoAccessor
+ .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
+ if (numConcurrentTasksPerInstance != null) {
+ try {
+ return Integer.parseInt(numConcurrentTasksPerInstance);
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse cluster config: {}",
+ MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e);
+ }
+ }
+ return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ }
+
+ @Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
new file mode 100644
index 0000000..b39e088
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
+ */
+public class SegmentGenerationAndPushTaskGeneratorTest {
+
+ @Test
+ public void testNumberConcurrentTasksPerInstance() {
+ ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+ when(mockClusterInfoProvide
+ .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+ .thenReturn("5");
+ SegmentGenerationAndPushTaskGenerator generator = new SegmentGenerationAndPushTaskGenerator();
+ generator.init(mockClusterInfoProvide);
+ Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 5);
+ when(mockClusterInfoProvide
+ .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+ .thenReturn(null);
+ Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+ when(mockClusterInfoProvide
+ .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+ .thenReturn("abcd");
+ Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 546a9fb..8abae73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,8 @@ public class MinionConstants {
// Generate segment and push to controller based on batch ingestion configs
public static class SegmentGenerationAndPushTask {
public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+ public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
+ "pinot.minion.task.generator.SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org