You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "vvivekiyer (via GitHub)" <gi...@apache.org> on 2023/04/19 14:30:51 UTC

[GitHub] [pinot] vvivekiyer commented on a diff in pull request #10617: [multistage] Make Intermediate Stage Worker Assignment Tenant Aware

vvivekiyer commented on code in PR #10617:
URL: https://github.com/apache/pinot/pull/10617#discussion_r1171435014


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -744,4 +769,63 @@ InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest, l
       }
     }
   }
+
+  private void buildTableTenantServerMap(String tableNameWithType, TableConfig tableConfig) {
+    LOGGER.info("buildTableTenantServerMap");
+    String serverTag = getServerTagForTable(tableNameWithType, tableConfig);
+    List<InstanceConfig> allInstanceConfigs = HelixHelper.getInstanceConfigs(_helixManager);
+    List<InstanceConfig> instanceConfigsWithTag = HelixHelper.getInstancesConfigsWithTag(allInstanceConfigs, serverTag);
+    Map<String, ServerInstance> serverInstances = new HashMap<>();
+    for (InstanceConfig serverInstanceConfig : instanceConfigsWithTag) {
+      LOGGER.info("Building instances. table={}, host={}, instance={}", tableNameWithType,
+          serverInstanceConfig.getHostName(), serverInstanceConfig.getInstanceName());
+      serverInstances.put(serverInstanceConfig.getInstanceName(), new ServerInstance(serverInstanceConfig));
+    }
+    _tableTenantServersMap.put(tableNameWithType, serverInstances);
+  }
+
+  private void addNewServerToTableTenantServerMap(String instanceId, ServerInstance serverInstance,
+      InstanceConfig instanceConfig) {
+    LOGGER.info("addNewServerToTableTenantServerMap");
+    List<String> tags = instanceConfig.getTags();
+
+    for (Map.Entry<String, Map<String, ServerInstance>> entry : _tableTenantServersMap.entrySet()) {
+      String tableNameWithType = entry.getKey();
+      TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      String tableServerTag = getServerTagForTable(tableNameWithType, tableConfig);
+
+      Map<String, ServerInstance> tenantServerMap = entry.getValue();
+
+      LOGGER.info("Iterating for table={}, tag={}, server_instance_id={}, host={}", tableNameWithType, tableServerTag,
+          instanceId, serverInstance.getHostname());
+
+      if (tags.contains(tableServerTag) && !tenantServerMap.containsKey(instanceId)) {
+        LOGGER.info("Adding instance={}", instanceId);
+        tenantServerMap.put(instanceId, serverInstance);
+      }
+    }
+  }
+
+  private String getServerTagForTable(String tableNameWithType, TableConfig tableConfig) {
+    String serverTenantName = tableConfig.getTenantConfig().getServer();
+    String serverTag;
+    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+      serverTag = TagNameUtils.getOfflineTagForTenant(serverTenantName);
+    } else {
+      // Realtime table
+      serverTag = TagNameUtils.getRealtimeTagForTenant(serverTenantName);
+    }
+
+    LOGGER.info("Server tag for table={}, tag={}, servertenant={}", tableNameWithType, serverTag, serverTenantName);
+    return serverTag;
+  }
+
+
+  private void deleteServerFromTableTenantServerMap(String server) {
+    for (Map.Entry<String, Map<String, ServerInstance>> entry : _tableTenantServersMap.entrySet()) {
+      if (entry.getValue().remove(server) != null) {

Review Comment:
   We are iterating the outer map. And deleting entries from the inner map. So we would not hit this execption. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org