You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/20 20:07:22 UTC

[incubator-pinot] 01/01: Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_cluster_config_for_minion_SegmentGenerationAndPushTaskGenerator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 23d0ebdfa35bda874fc886fbe6cccd2d0ab4bfa3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jan 20 12:06:47 2021 -0800

    Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance
---
 .../helix/core/minion/ClusterInfoAccessor.java     | 19 ++++++++
 .../SegmentGenerationAndPushTaskGenerator.java     | 16 +++++++
 .../SegmentGenerationAndPushTaskGeneratorTest.java | 53 ++++++++++++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  2 +
 4 files changed, 90 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 8d3db71..7d4aad8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -157,4 +160,20 @@ public class ClusterInfoAccessor {
   public String getVipUrl() {
     return _controllerConf.generateVipUrl();
   }
+
+  /**
+   * Get the cluster config for a given cluster config.
+   *
+   * @return cluster config
+   */
+  public String getClusterConfig(String configName) {
+    HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+    Map<String, String> configMap =
+        _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Arrays.asList(configName));
+    if (configMap == null || !configMap.containsKey(configName)) {
+      return null;
+    }
+    return configMap.get(configName);
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 41b7740..face245 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
@@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
   }
 
   @Override
+  public int getNumConcurrentTasksPerInstance() {
+    String numConcurrentTasksPerInstance = _clusterInfoAccessor
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
+    if (numConcurrentTasksPerInstance != null) {
+      try {
+        return Integer.parseInt(numConcurrentTasksPerInstance);
+      } catch (Exception e) {
+        LOGGER.error("Failed to parse cluster config: {}",
+            MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e);
+      }
+    }
+    return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
+
+  @Override
   public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
new file mode 100644
index 0000000..b39e088
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
+ */
+public class SegmentGenerationAndPushTaskGeneratorTest {
+
+  @Test
+  public void testNumberConcurrentTasksPerInstance() {
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn("5");
+    SegmentGenerationAndPushTaskGenerator generator = new SegmentGenerationAndPushTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 5);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn(null);
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn("abcd");
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 546a9fb..8abae73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,8 @@ public class MinionConstants {
   // Generate segment and push to controller based on batch ingestion configs
   public static class SegmentGenerationAndPushTask {
     public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+    public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
+        "pinot.minion.task.generator.SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org