You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/20 20:07:21 UTC

[incubator-pinot] branch adding_cluster_config_for_minion_SegmentGenerationAndPushTaskGenerator created (now 23d0ebd)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch adding_cluster_config_for_minion_SegmentGenerationAndPushTaskGenerator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 23d0ebd  Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance

This branch includes the following new commits:

     new 23d0ebd  Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_cluster_config_for_minion_SegmentGenerationAndPushTaskGenerator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 23d0ebdfa35bda874fc886fbe6cccd2d0ab4bfa3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jan 20 12:06:47 2021 -0800

    Adding cluster config to config minion task: SegmentGenerationAndPushTaskGenerator number of concurrent tasks per instance
---
 .../helix/core/minion/ClusterInfoAccessor.java     | 19 ++++++++
 .../SegmentGenerationAndPushTaskGenerator.java     | 16 +++++++
 .../SegmentGenerationAndPushTaskGeneratorTest.java | 53 ++++++++++++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  2 +
 4 files changed, 90 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 8d3db71..7d4aad8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -157,4 +160,20 @@ public class ClusterInfoAccessor {
   public String getVipUrl() {
     return _controllerConf.generateVipUrl();
   }
+
+  /**
+   * Get the cluster config for a given cluster config.
+   *
+   * @return cluster config
+   */
+  public String getClusterConfig(String configName) {
+    HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+        .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+    Map<String, String> configMap =
+        _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Arrays.asList(configName));
+    if (configMap == null || !configMap.containsKey(configName)) {
+      return null;
+    }
+    return configMap.get(configName);
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
index 41b7740..face245 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
@@ -104,6 +105,21 @@ public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator
   }
 
   @Override
+  public int getNumConcurrentTasksPerInstance() {
+    String numConcurrentTasksPerInstance = _clusterInfoAccessor
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE);
+    if (numConcurrentTasksPerInstance != null) {
+      try {
+        return Integer.parseInt(numConcurrentTasksPerInstance);
+      } catch (Exception e) {
+        LOGGER.error("Failed to parse cluster config: {}",
+            MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE, e);
+      }
+    }
+    return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
+
+  @Override
   public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
new file mode 100644
index 0000000..b39e088
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
+ */
+public class SegmentGenerationAndPushTaskGeneratorTest {
+
+  @Test
+  public void testNumberConcurrentTasksPerInstance() {
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn("5");
+    SegmentGenerationAndPushTaskGenerator generator = new SegmentGenerationAndPushTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 5);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn(null);
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+    when(mockClusterInfoProvide
+        .getClusterConfig(MinionConstants.SegmentGenerationAndPushTask.CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE))
+        .thenReturn("abcd");
+    Assert.assertEquals(generator.getNumConcurrentTasksPerInstance(), 1);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 546a9fb..8abae73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,8 @@ public class MinionConstants {
   // Generate segment and push to controller based on batch ingestion configs
   public static class SegmentGenerationAndPushTask {
     public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+    public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
+        "pinot.minion.task.generator.SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org