You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/21 23:21:05 UTC

[incubator-pinot] branch master updated: Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f17be35  Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468)
f17be35 is described below

commit f17be35c348686c8041f0c3e2aef87f62e2cc97a
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Jan 21 15:20:46 2021 -0800

    Adding cluster config to config number of concurrent tasks per instance for minion task: SegmentGenerationAndPushTaskGenerator (#6468)
    
    * Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance
    
    * Switch to real controller test
    
    * Address comments
---
 .../helix/ControllerRequestURLBuilder.java         |  4 ++
 .../helix/core/minion/ClusterInfoAccessor.java     | 16 +++++
 .../SegmentGenerationAndPushTaskGenerator.java     | 16 +++++
 .../SegmentGenerationAndPushTaskGeneratorTest.java | 80 ++++++++++++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  2 +
 5 files changed, 118 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index c77ae3b..f938e0b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -332,4 +332,8 @@ public class ControllerRequestURLBuilder {
             .collect(Collectors.joining(",", "{", "}"));
     return forIngestFromURI(tableNameWithType, batchConfigMapStr, sourceURIStr);
   }
+
+  public String forClusterConfigs() {
+    return StringUtil.join("/", _baseUrl, "cluster/configs");
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 8d3db71..7b1ff36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -157,4 +160,17 @@ public class ClusterInfoAccessor {
   public String getVipUrl() {
     return _controllerConf.generateVipUrl();
   }
+
+  /**
+   * Get the cluster config for a given config name, return null if not found.
+   *
+   * @return cluster config
+   */
+  public String getClusterConfig(String configName) {
+    HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+    Map<String, String> configMap =
+        _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName));
+    return configMap != null ? configMap.get(configName) : null;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 41b7740..3ea3d31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
@@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
   }
 
   @Override
+  public int getNumConcurrentTasksPerInstance() {
+    String numConcurrentTasksPerInstanceStr = _clusterInfoAccessor
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
+    if (numConcurrentTasksPerInstanceStr != null) {
+      try {
+        return Integer.parseInt(numConcurrentTasksPerInstanceStr);
+      } catch (Exception e) {
+        LOGGER.error("Failed to parse cluster config: {}",
+            MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e);
+      }
+    }
+    return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
+
+  @Override
   public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
new file mode 100644
index 0000000..5a81ef1
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
+ */
+public class SegmentGenerationAndPushTaskGeneratorTest extends ControllerTest {
+  SegmentGenerationAndPushTaskGenerator _generator;
+
+  @BeforeClass
+  public void setup() {
+    int zkPort = 2171;
+    startZk(zkPort);
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    properties.put(ControllerConf.ZK_STR, "localhost:" + zkPort);
+    properties.put(ControllerConf.HELIX_CLUSTER_NAME, SegmentGenerationAndPushTaskGeneratorTest.class.getSimpleName());
+    properties.put(ControllerConf.CONTROLLER_PORT, 28998);
+    startController(properties);
+
+    ClusterInfoAccessor clusterInfoAccessor = _controllerStarter.getTaskManager().getClusterInfoAccessor();
+    _generator = new SegmentGenerationAndPushTaskGenerator();
+    _generator.init(clusterInfoAccessor);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopController();
+    stopZk();
+  }
+
+  @Test
+  public void testRealCluster()
+      throws Exception {
+    // Default is 1
+    Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1);
+
+    // Set config to 5
+    String request = JsonUtils.objectToString(Collections
+        .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, "5"));
+    sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request);
+    Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 5);
+
+    // Set config to invalid and should still get 1
+    request = JsonUtils.objectToString(Collections
+        .singletonMap(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE,
+            "abcd"));
+    sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(), request);
+    Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 546a9fb..d9d1a9d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,8 @@ public class MinionConstants {
   // Generate segment and push to controller based on batch ingestion configs
   public static class SegmentGenerationAndPushTask {
     public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+    public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
+        "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org