You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "saurabhd336 (via GitHub)" <gi...@apache.org> on 2023/11/22 06:33:31 UTC

[PR] Allow tenant level config override [pinot]

saurabhd336 opened a new pull request, #12039:
URL: https://github.com/apache/pinot/pull/12039

   Allows overriding server configs for a specific tenant by setting tenant prefixed config in /cluster/configs or server.conf file following this convention
   ```
   "pinot.tenant.SomeServerTenant.<Server Config Key>": <Value>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow tenant level config override [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on PR #12039:
URL: https://github.com/apache/pinot/pull/12039#issuecomment-1859227188

   > We probably want to separate the tenant config for brokers and servers. Since the tag is different, we may consider applying config based on instance tag. We may use `pinot.tag.<tagName>.<config>`, where `pinot.tag.<tagName>` will be replaced to `pinot.broker` or `pinot.server`
   
   Sorry I don't follow. Right now I had only added support for overriding server conf for servers tagged with a specific tenant. 
   Your suggestion is to
   
   1) Allow overriding conf for all components (servers, brokers, controllers, minions)
   2) Use instance tag instead of tenant name for the override
   
   Is this correct? #1 makes sense to me, I'll add the changes to support all components. #2, I'm not sure why using tenant names directly for the override would not work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow tenant level config override [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12039:
URL: https://github.com/apache/pinot/pull/12039#issuecomment-1822227336

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `58 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`e4d3a8d`)](https://app.codecov.io/gh/apache/pinot/commit/e4d3a8dde8df8be8fcc3ae20c96ec6d004dd7933?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.64% compared to head [(`7dcbba7`)](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 34.93%.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...ache/pinot/common/utils/ServiceStartableUtils.java](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU2VydmljZVN0YXJ0YWJsZVV0aWxzLmphdmE=) | 0.00% | [41 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../pinot/server/starter/helix/BaseServerStarter.java](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zZXJ2ZXIvc3RhcnRlci9oZWxpeC9CYXNlU2VydmVyU3RhcnRlci5qYXZh) | 0.00% | [17 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12039       +/-   ##
   =============================================
   - Coverage     61.64%   34.93%   -26.71%     
   =============================================
     Files          2385     2309       -76     
     Lines        129519   125794     -3725     
     Branches      20043    19489      -554     
   =============================================
   - Hits          79844    43950    -35894     
   - Misses        43869    78728    +34859     
   + Partials       5806     3116     -2690     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.59%)` | :arrow_down: |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.93% <0.00%> (-26.58%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.92% <0.00%> (-26.68%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.91% <0.00%> (-26.58%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.93% <0.00%> (-26.71%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.79% <0.00%> (-14.85%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.79% <0.00%> (-0.20%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12039/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12039?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow instance tag level config override [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12039:
URL: https://github.com/apache/pinot/pull/12039#discussion_r1432001044


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)
+        .collect(Collectors.toSet());
 
-      Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
-      String instanceConfigKeyPrefix =
-          String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
-      for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
-          String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
-          addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
-        } else {
-          // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
-          //       only put keys with the instance config key prefix.
-          addConfigIfNotExists(instanceConfig, key, value);
+    for (String key : instanceConfig.getKeys()) {
+      if (key.startsWith(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = key.substring(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX.length());
+        String tag = instanceConfigKey.substring(0, instanceConfigKey.indexOf('.'));
+        String tagKey = instanceConfigKey.substring(tag.length() + 1);
+        if (instanceTags.contains(tag)) {
+          instanceConfig.setProperty(tagKey, instanceConfig.getProperty(key));

Review Comment:
   Do we need the complete key here? Or we just need the part after `pinot.<type>`?
   E.g. Do we want `pinot.tag.myTenant_OFFLINE.pinot.server.query.executor.abc` or `pinot.tag.myTenant_OFFLINE.query.executor.abc`?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);

Review Comment:
   This is common when the node joins the cluster for the first time. We probably want to log info instead



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -135,34 +136,44 @@ public void init(PinotConfiguration brokerConf)
     // Remove all white-spaces from the list of zkServers (if any).
     _zkServers = brokerConf.getProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", "");
     _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
-    ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER);
+    ZkClient zkClient = ServiceStartableUtils.getZKClient(_brokerConf, _zkServers);
+    try {
+      ServiceStartableUtils.applyClusterConfig(_brokerConf, zkClient, _clusterName, ServiceRole.BROKER);
+      _hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME);
+      if (_hostname == null) {
+        _hostname =
+            _brokerConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils.getHostnameOrAddress()
+                : NetUtils.getHostAddress();
+      }
+
+      if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+          MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
+        _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort());
+      }
+      _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
+
+      // Override multi-stage query runner hostname if not set explicitly
+      if (!_brokerConf.containsKey(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME)) {
+        _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, _hostname);
+      }
+      _port = _listenerConfigs.get(0).getPort();
+
+      _instanceId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID);
+      if (_instanceId == null) {
+        _instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY);
+      }
+      if (_instanceId == null) {
+        _instanceId = Helix.PREFIX_OF_BROKER_INSTANCE + _hostname + "_" + _port;
+      }
 
-    if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
-        MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
-      _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort());
+      ServiceStartableUtils.applyTenantConfigs(_instanceId, zkClient, _clusterName, _brokerConf);

Review Comment:
   Since this requires reading the `InstanceConfig` (automatically created when the node joins the cluster for the first time) from ZK, we can consider moving it into `start()` after the instance is connected to the cluster. At that moment, we don't need to create another `ZkClient`.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)

Review Comment:
   Why do we want to relax property name here? This is ZK config, which should have the exact name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow tenant level config override [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #12039:
URL: https://github.com/apache/pinot/pull/12039#discussion_r1409728866


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +40,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TENANT_LEVEL_CONFIG_KEY_PREFIX = "pinot.tenant.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void overrideTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,

Review Comment:
   call this `applyTenantConfig` to be a bit more consistent with the `applyClusterConfig`, and can extract the logic of getting `tenantsRelaxedNames` to a helper method to simplify `applyTenantConfig` method
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow tenant level config override [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12039:
URL: https://github.com/apache/pinot/pull/12039#issuecomment-1862013778

   Tenant to tag mapping is not one to one for servers (e.g `myTenant_OFFLINE` and `myTenant_REALTIME`), so using tenant name will lose the flexibility of putting different config for `OFFLINE` and `REALTIME` tenant. Also, from tenant name we cannot tell whether the tenant is server or broker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org