You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/12/19 22:37:19 UTC

Re: [PR] Allow instance tag level config override [pinot]

Jackie-Jiang commented on code in PR #12039:
URL: https://github.com/apache/pinot/pull/12039#discussion_r1432001044


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)
+        .collect(Collectors.toSet());
 
-      Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
-      String instanceConfigKeyPrefix =
-          String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
-      for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
-        String key = entry.getKey();
-        String value = entry.getValue();
-        if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
-          String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
-          addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
-        } else {
-          // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
-          //       only put keys with the instance config key prefix.
-          addConfigIfNotExists(instanceConfig, key, value);
+    for (String key : instanceConfig.getKeys()) {
+      if (key.startsWith(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = key.substring(PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX.length());
+        String tag = instanceConfigKey.substring(0, instanceConfigKey.indexOf('.'));
+        String tagKey = instanceConfigKey.substring(tag.length() + 1);
+        if (instanceTags.contains(tag)) {
+          instanceConfig.setProperty(tagKey, instanceConfig.getProperty(key));

Review Comment:
   Do we need the complete key here? Or we just need the part after `pinot.<type>`?
   E.g. Do we want `pinot.tag.myTenant_OFFLINE.pinot.server.query.executor.abc` or `pinot.tag.myTenant_OFFLINE.query.executor.abc`?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);

Review Comment:
   This is common when the node joins the cluster for the first time. We probably want to log info instead



##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -135,34 +136,44 @@ public void init(PinotConfiguration brokerConf)
     // Remove all white-spaces from the list of zkServers (if any).
     _zkServers = brokerConf.getProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", "");
     _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
-    ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER);
+    ZkClient zkClient = ServiceStartableUtils.getZKClient(_brokerConf, _zkServers);
+    try {
+      ServiceStartableUtils.applyClusterConfig(_brokerConf, zkClient, _clusterName, ServiceRole.BROKER);
+      _hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME);
+      if (_hostname == null) {
+        _hostname =
+            _brokerConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils.getHostnameOrAddress()
+                : NetUtils.getHostAddress();
+      }
+
+      if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+          MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
+        _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort());
+      }
+      _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
+
+      // Override multi-stage query runner hostname if not set explicitly
+      if (!_brokerConf.containsKey(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME)) {
+        _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, _hostname);
+      }
+      _port = _listenerConfigs.get(0).getPort();
+
+      _instanceId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID);
+      if (_instanceId == null) {
+        _instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY);
+      }
+      if (_instanceId == null) {
+        _instanceId = Helix.PREFIX_OF_BROKER_INSTANCE + _hostname + "_" + _port;
+      }
 
-    if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
-        MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) {
-      _brokerConf.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort());
+      ServiceStartableUtils.applyTenantConfigs(_instanceId, zkClient, _clusterName, _brokerConf);

Review Comment:
   Since this requires reading the `InstanceConfig` (automatically created when the node joins the cluster for the first time) from ZK, we can consider moving it into `start()` after the instance is connected to the cluster. At that moment, we don't need to create another `ZkClient`.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java:
##########
@@ -36,56 +39,91 @@ private ServiceStartableUtils() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ServiceStartableUtils.class);
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
+  private static final String INSTANCE_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/PARTICIPANT/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+  private static final String PINOT_TAG_LEVEL_CONFIG_KEY_PREFIX = "pinot.tag.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
 
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+      ServiceRole serviceRole) {
+    ZkClient zkClient = getZKClient(instanceConfig, zkAddress);
+    try {
+      applyClusterConfig(instanceConfig, zkClient, clusterName, serviceRole);
+    } finally {
+      zkClient.close();
+    }
+  }
+
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
    *
    * In the ZK cluster config:
    * - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
    */
-  public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
+  public static void applyClusterConfig(PinotConfiguration instanceConfig, ZkClient zkClient, String clusterName,
       ServiceRole serviceRole) {
+    ZNRecord clusterConfigZNRecord =
+        zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
+    if (clusterConfigZNRecord == null) {
+      LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
+      return;
+    }
+
+    Map<String, String> clusterConfigs = clusterConfigZNRecord.getSimpleFields();
+    String instanceConfigKeyPrefix =
+        String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE, serviceRole.name().toLowerCase());
+    for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+        String instanceConfigKey = instanceConfigKeyPrefix + key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+        addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+      } else {
+        // TODO: Currently it puts all keys to the instance config. Consider standardizing instance config keys and
+        //       only put keys with the instance config key prefix.
+        addConfigIfNotExists(instanceConfig, key, value);
+      }
+    }
+  }
+
+  public static ZkClient getZKClient(PinotConfiguration instanceConfig, String zkAddress) {
     int zkClientSessionConfig =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
     int zkClientConnectionTimeoutMs =
         instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
             CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
-    ZkClient zkClient = new ZkClient.Builder()
-        .setZkSerializer(new ZNRecordSerializer())
-        .setZkServer(zkAddress)
-        .setConnectionTimeout(zkClientConnectionTimeoutMs)
-        .setSessionTimeout(zkClientSessionConfig)
-        .build();
+    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs).setSessionTimeout(zkClientSessionConfig).build();
     zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
+    return zkClient;
+  }
 
-    try {
-      ZNRecord clusterConfigZNRecord =
-          zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
-      if (clusterConfigZNRecord == null) {
-        LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
-        return;
-      }
+  /**
+   * Overrides the instance config with the tenant configs if the tenant is tagged on the instance.
+   */
+  public static void applyTenantConfigs(String instanceId, ZkClient zkClient, String clusterName,
+      PinotConfiguration instanceConfig) {
+    ZNRecord instanceConfigZNRecord =
+        zkClient.readData(String.format(INSTANCE_CONFIG_ZK_PATH_TEMPLATE, clusterName, instanceId), true);
+    if (instanceConfigZNRecord == null) {
+      LOGGER.warn("Failed to find instance config for instance: {}, skipping overriding tenant configs", instanceId);
+      return;
+    }
+    InstanceConfig instanceZKConfig = new InstanceConfig(instanceConfigZNRecord);
+    Set<String> instanceTags = instanceZKConfig.getTags().stream()
+        .map(PinotConfiguration::relaxPropertyName)

Review Comment:
   Why do we want to relax property name here? This is ZK config, which should have the exact name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org