You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/31 02:14:44 UTC

[incubator-pinot] branch table_cache created (now f64b211)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch table_cache
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at f64b211  Refactor TableCache

This branch includes the following new commits:

     new f64b211  Refactor TableCache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Refactor TableCache

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch table_cache
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f64b211c9980695ac96aba11f67b026aa1f6ec8c
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Jul 30 18:44:04 2020 -0700

    Refactor TableCache
---
 .../broker/broker/helix/HelixBrokerStarter.java    |  35 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 166 +++++-----
 .../SingleConnectionBrokerRequestHandler.java      |  11 +-
 .../LiteralOnlyBrokerRequestTest.java              |  16 +-
 .../pinot/common/utils/helix/TableCache.java       | 362 ++++++++++++++-------
 .../helix/core/PinotHelixResourceManager.java      |  51 +--
 .../pinot/controller/helix/TableCacheTest.java     | 168 ++++++++++
 pinot-spi/pom.xml                                  |   4 +
 pom.xml                                            |  10 +-
 9 files changed, 558 insertions(+), 265 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 3617a82..9ba8207 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -18,17 +18,14 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.annotation.Nullable;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -61,15 +58,13 @@ import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableList;
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 @SuppressWarnings("unused")
 public class HelixBrokerStarter implements ServiceStartable {
@@ -106,7 +101,8 @@ public class HelixBrokerStarter implements ServiceStartable {
     this(brokerConf, clusterName, zkServer, null);
   }
 
-  public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, String zkServer, @Nullable String brokerHost)
+  public HelixBrokerStarter(PinotConfiguration brokerConf, String clusterName, String zkServer,
+      @Nullable String brokerHost)
       throws Exception {
     _brokerConf = brokerConf;
     setupHelixSystemProperties();
@@ -190,17 +186,15 @@ public class HelixBrokerStarter implements ServiceStartable {
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
+
     // Fetch cluster level config from ZK
-    ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor();
     HelixConfigScope helixConfigScope =
         new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
-    Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays
+    Map<String, String> configMap = _helixAdmin.getConfig(helixConfigScope, Arrays
         .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
             Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY));
-    if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean
-        .parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) {
-      _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true);
-    }
+    boolean caseInsensitive = Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean
+        .parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
     String log2mStr = configMap.get(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY);
     if (log2mStr != null) {
       try {
@@ -233,9 +227,10 @@ public class HelixBrokerStarter implements ServiceStartable {
     queryQuotaManager.init(_spectatorHelixManager);
     // Initialize FunctionRegistry before starting the broker request handler
     FunctionRegistry.init();
+    TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
     _brokerRequestHandler =
         new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
-            _brokerMetrics, _propertyStore);
+            tableCache, _brokerMetrics);
 
     int brokerQueryPort = _brokerConf.getProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Helix.DEFAULT_BROKER_QUERY_PORT);
     LOGGER.info("Starting broker admin application on port: {}", brokerQueryPort);
@@ -311,8 +306,9 @@ public class HelixBrokerStarter implements ServiceStartable {
       }
     }
 
-    double minResourcePercentForStartup = _brokerConf.getProperty(
-        Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
+    double minResourcePercentForStartup = _brokerConf
+        .getProperty(Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
+            Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
 
     LOGGER.info("Registering service status handler");
     ServiceStatus.setServiceStatusCallback(_brokerId, new ServiceStatus.MultipleCallbackServiceStatusCallback(
@@ -395,7 +391,8 @@ public class HelixBrokerStarter implements ServiceStartable {
     return _brokerRequestHandler;
   }
 
-  public static HelixBrokerStarter getDefault() throws Exception {
+  public static HelixBrokerStarter getDefault()
+      throws Exception {
     Map<String, Object> properties = new HashMap<>();
 
     properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 5001);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index e442b5d..7bb8c7a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,14 +35,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-
 import org.apache.calcite.sql.SqlKind;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -85,11 +85,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.util.concurrent.RateLimiter;
-
 
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
@@ -99,6 +94,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final RoutingManager _routingManager;
   protected final AccessControlFactory _accessControlFactory;
   protected final QueryQuotaManager _queryQuotaManager;
+  protected final TableCache _tableCache;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
@@ -115,36 +111,29 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   private final RateLimiter _numDroppedLogRateLimiter;
   private final AtomicInteger _numDroppedLog;
 
-  private final boolean _enableCaseInsensitive;
   private final boolean _enableQueryLimitOverride;
   private final int _defaultHllLog2m;
-  private final TableCache _tableCache;
 
   public BaseBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics) {
     _config = config;
     _routingManager = routingManager;
     _accessControlFactory = accessControlFactory;
     _queryQuotaManager = queryQuotaManager;
+    _tableCache = tableCache;
     _brokerMetrics = brokerMetrics;
 
-    _enableCaseInsensitive = _config.getProperty(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, false);
-    if (_enableCaseInsensitive) {
-      _tableCache = new TableCache(propertyStore);
-    } else {
-      _tableCache = null;
-    }
-    _defaultHllLog2m = _config
-        .getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
-
+    _defaultHllLog2m = _config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
+        CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
     _enableQueryLimitOverride = _config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
 
     _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
         config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
-    _queryLogLength = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
+    _queryLogLength =
+        config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
     _queryLogRateLimiter = RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
         Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
     _numDroppedLog = new AtomicInteger(0);
@@ -201,7 +190,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
                 e.getMessage());
       }
     }
-    updateQuerySource(brokerRequest);
+    updateTableName(brokerRequest);
     try {
       updateColumnNames(brokerRequest);
     } catch (Exception e) {
@@ -442,43 +431,60 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
    *
    * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name],
    * but only [table_name].
-   *
-   * @param brokerRequest
    */
-  private void updateQuerySource(BrokerRequest brokerRequest) {
+  private void updateTableName(BrokerRequest brokerRequest) {
     String tableName = brokerRequest.getQuerySource().getTableName();
-    // Check if table is in the format of [database_name].[table_name]
-    String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
-    // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name]
-    if (_enableCaseInsensitive) {
-      if (tableNameSplits.length < 2) {
-        brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName));
+
+    // Use TableCache to handle case-insensitive table name
+    if (_tableCache.isCaseInsensitive()) {
+      String actualTableName = _tableCache.getActualTableName(tableName);
+      if (actualTableName != null) {
+        setTableName(brokerRequest, actualTableName);
         return;
       }
-      if (_tableCache.containsTable(tableNameSplits[1]) && !_tableCache.containsTable(tableName)) {
-        // Use TableCache to check case insensitive table name.
-        brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1]));
+
+      // Check if table is in the format of [database_name].[table_name]
+      String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
+      if (tableNameSplits.length == 2) {
+        actualTableName = _tableCache.getActualTableName(tableNameSplits[1]);
+        if (actualTableName != null) {
+          setTableName(brokerRequest, actualTableName);
+          return;
+        }
       }
+
       return;
     }
-    if (tableNameSplits.length < 2) {
+
+    // Check if table is in the format of [database_name].[table_name]
+    String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
+    if (tableNameSplits.length != 2) {
       return;
     }
-    // Use RoutingManager to check case sensitive table name.
+
+    // Use RoutingManager to handle case-sensitive table name
+    // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name]
     if (TableNameBuilder.isTableResource(tableName)) {
       if (_routingManager.routingExists(tableNameSplits[1]) && !_routingManager.routingExists(tableName)) {
-        brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+        setTableName(brokerRequest, tableNameSplits[1]);
       }
       return;
     }
+    if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1]))
+        && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) {
+      setTableName(brokerRequest, tableNameSplits[1]);
+      return;
+    }
     if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableNameSplits[1]))
         && !_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) {
-      brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
-      return;
+      setTableName(brokerRequest, tableNameSplits[1]);
     }
-    if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1]))
-        && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) {
-      brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+  }
+
+  private void setTableName(BrokerRequest brokerRequest, String tableName) {
+    brokerRequest.getQuerySource().setTableName(tableName);
+    if (brokerRequest.getPinotQuery() != null) {
+      brokerRequest.getPinotQuery().getDataSource().setTableName(tableName);
     }
   }
 
@@ -672,14 +678,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
    * Fixes the column names to the actual column names in the given broker request.
    */
   private void updateColumnNames(BrokerRequest brokerRequest) {
-    String tableName = brokerRequest.getQuerySource().getTableName();
-    //fix columns
+    String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
+    Map<String, String> columnNameMap =
+        _tableCache.isCaseInsensitive() ? _tableCache.getColumnNameMap(rawTableName) : null;
+
     if (brokerRequest.getFilterSubQueryMap() != null) {
       Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
       for (FilterQuery filterQuery : values) {
         if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
-          String expression = filterQuery.getColumn();
-          filterQuery.setColumn(fixColumnName(tableName, expression));
+          filterQuery.setColumn(fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap));
         }
       }
     }
@@ -688,14 +695,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
           // Always read from backward compatible api in AggregationFunctionUtils.
           List<String> arguments = AggregationFunctionUtils.getArguments(info);
-          arguments.replaceAll(e -> fixColumnName(tableName, e));
+          arguments.replaceAll(e -> fixColumnName(rawTableName, e, columnNameMap));
           info.setExpressions(arguments);
         }
       }
       if (brokerRequest.isSetGroupBy()) {
         List<String> expressions = brokerRequest.getGroupBy().getExpressions();
         for (int i = 0; i < expressions.size(); i++) {
-          expressions.set(i, fixColumnName(tableName, expressions.get(i)));
+          expressions.set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap));
         }
       }
     } else {
@@ -704,7 +711,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       for (int i = 0; i < selectionColumns.size(); i++) {
         String expression = selectionColumns.get(i);
         if (!expression.equals("*")) {
-          selectionColumns.set(i, fixColumnName(tableName, expression));
+          selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap));
         }
       }
     }
@@ -712,86 +719,87 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       for (SelectionSort selectionSort : orderBy) {
         String expression = selectionSort.getColumn();
-        selectionSort.setColumn(fixColumnName(tableName, expression));
+        selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap));
       }
     }
 
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     if (pinotQuery != null) {
-      pinotQuery.getDataSource().setTableName(tableName);
       for (Expression expression : pinotQuery.getSelectList()) {
-        fixColumnName(tableName, expression);
+        fixColumnName(rawTableName, expression, columnNameMap);
       }
       Expression filterExpression = pinotQuery.getFilterExpression();
       if (filterExpression != null) {
-        fixColumnName(tableName, filterExpression);
+        fixColumnName(rawTableName, filterExpression, columnNameMap);
       }
       List<Expression> groupByList = pinotQuery.getGroupByList();
       if (groupByList != null) {
         for (Expression expression : groupByList) {
-          fixColumnName(tableName, expression);
+          fixColumnName(rawTableName, expression, columnNameMap);
         }
       }
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (orderByList != null) {
         for (Expression expression : orderByList) {
-          fixColumnName(tableName, expression);
+          fixColumnName(rawTableName, expression, columnNameMap);
         }
       }
       Expression havingExpression = pinotQuery.getHavingExpression();
       if (havingExpression != null) {
-        fixColumnName(tableName, havingExpression);
+        fixColumnName(rawTableName, havingExpression, columnNameMap);
       }
     }
   }
 
-  private String fixColumnName(String tableNameWithType, String expression) {
+  private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap) {
     TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-    fixColumnName(tableNameWithType, expressionTree);
+    fixColumnName(rawTableName, expressionTree, columnNameMap);
     return expressionTree.toString();
   }
 
-  private void fixColumnName(String tableNameWithType, TransformExpressionTree expression) {
+  private void fixColumnName(String rawTableName, TransformExpressionTree expression,
+      @Nullable Map<String, String> columnNameMap) {
     TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType();
     if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-      String identifier = expression.getValue();
-      expression.setValue(getActualColumnName(tableNameWithType, identifier));
+      expression.setValue(getActualColumnName(rawTableName, expression.getValue(), columnNameMap));
     } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) {
       for (TransformExpressionTree child : expression.getChildren()) {
-        fixColumnName(tableNameWithType, child);
+        fixColumnName(rawTableName, child, columnNameMap);
       }
     }
   }
 
-  private void fixColumnName(String tableNameWithType, Expression expression) {
+  private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap) {
     ExpressionType expressionType = expression.getType();
     if (expressionType == ExpressionType.IDENTIFIER) {
       Identifier identifier = expression.getIdentifier();
-      identifier.setName(getActualColumnName(tableNameWithType, identifier.getName()));
+      identifier.setName(getActualColumnName(rawTableName, identifier.getName(), columnNameMap));
     } else if (expressionType == ExpressionType.FUNCTION) {
       for (Expression operand : expression.getFunctionCall().getOperands()) {
-        fixColumnName(tableNameWithType, operand);
+        fixColumnName(rawTableName, operand, columnNameMap);
       }
     }
   }
 
-  private String getActualColumnName(String tableNameWithType, String columnName) {
+  private String getActualColumnName(String rawTableName, String columnName,
+      @Nullable Map<String, String> columnNameMap) {
+    // Check if column is in the format of [table_name].[column_name]
     String[] splits = StringUtils.split(columnName, ".", 2);
-    if (_enableCaseInsensitive) {
-      if (splits.length == 2) {
-        if (TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0])) {
-          return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
-        }
+    if (_tableCache.isCaseInsensitive()) {
+      if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) {
+        columnName = splits[1];
+      }
+      if (columnNameMap != null) {
+        return columnNameMap.getOrDefault(columnName, columnName);
+      } else {
+        return columnName;
       }
-      return _tableCache.getActualColumnName(tableNameWithType, columnName);
     } else {
-      if (splits.length == 2) {
-        if (TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) {
-          return splits[1];
-        }
+      if (splits.length == 2 && rawTableName.equals(splits[0])) {
+        columnName = splits[1];
       }
+      return columnName;
     }
-    return columnName;
   }
 
   private static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 420177a..1f67d88 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -21,12 +21,8 @@ package org.apache.pinot.broker.requesthandler;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -41,6 +37,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
 import org.apache.pinot.core.transport.QueryRouter;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -59,9 +56,9 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
   private final QueryRouter _queryRouter;
 
   public SingleConnectionBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics,
-      ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    super(config, routingManager, accessControlFactory, queryQuotaManager, brokerMetrics, propertyStore);
+      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 0dbb2e2..18f9e86 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Random;
-
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -30,10 +32,6 @@ import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 public class LiteralOnlyBrokerRequestTest {
   private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -92,8 +90,8 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandler()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), false), null);
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null,
+            new BrokerMetrics("", new MetricsRegistry(), false));
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
     RANDOM.nextBytes(randBytes);
@@ -119,8 +117,8 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandlerWithAsFunction()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), false), null);
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null,
+            new BrokerMetrics("", new MetricsRegistry(), false));
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = new ObjectMapper().readTree(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4e3dd4d..dbd360d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -18,19 +18,23 @@
  */
 package org.apache.pinot.common.utils.helix;
 
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -38,170 +42,284 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- *  Caches table config and schema of a table.
- *  At the start - loads all the table configs and schemas in map.
- *  sets up a zookeeper listener that watches for any change and updates the cache.
- *  TODO: optimize to load only changed table configs/schema on a callback.
- *  TODO: Table deletes are not handled as of now
- *  Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map for case-insensitive queries.
  */
 public class TableCache {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableCache.class);
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final boolean _caseInsensitive;
+  // For case insensitive, key is lower case table name, value is actual table name
+  private final Map<String, String> _tableNameMap;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  TableConfigChangeListener _tableConfigChangeListener;
-  SchemaChangeListener _schemaChangeListener;
+  // Key is table name with type suffix
+  private final TableConfigChangeListener _tableConfigChangeListener = new TableConfigChangeListener();
+  private final Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
+  // Key is raw table name
+  private final SchemaChangeListener _schemaChangeListener = new SchemaChangeListener();
+  private final Map<String, SchemaInfo> _schemaInfoMap = new ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean caseInsensitive) {
     _propertyStore = propertyStore;
-    _schemaChangeListener = new SchemaChangeListener();
-    _schemaChangeListener.refresh();
-    _tableConfigChangeListener = new TableConfigChangeListener();
-    _tableConfigChangeListener.refresh();
+    _caseInsensitive = caseInsensitive;
+    _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+    synchronized (_tableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing changes
+      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, _tableConfigChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String tableNameWithType : tables) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+        addTableConfigs(pathsToAdd);
+      }
+    }
+
+    synchronized (_schemaChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing changes
+      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, _schemaChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String rawTableName : tables) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+        addSchemas(pathsToAdd);
+      }
+    }
+
+    LOGGER.info("Initialized TableCache with caseInsensitive: {}", caseInsensitive);
   }
 
-  public String getActualTableName(String tableName) {
-    return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName);
+  /**
+   * Returns {@code true} if the TableCache is case-insensitive, {@code false} otherwise.
+   */
+  public boolean isCaseInsensitive() {
+    return _caseInsensitive;
   }
 
-  public boolean containsTable(String tableName) {
-    return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  /**
+   * For case-insensitive only, returns the actual table name for the given case-insensitive table name (with or without
+   * type suffix), or {@code null} if the table does not exist.
+   */
+  @Nullable
+  public String getActualTableName(String caseInsensitiveTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive");
+    return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
   }
 
-  public String getActualColumnName(String tableName, String columnName) {
-    String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
-    if (schemaName != null) {
-      String actualColumnName = _schemaChangeListener.getColumnName(schemaName, columnName);
-      // If actual column name doesn't exist in schema, then return the origin column name.
-      if (actualColumnName == null) {
-        return columnName;
+  /**
+   * For case-insensitive only, returns a map from lower case column name to actual column name for the given table, or
+   * {@code null} if the table schema does not exist.
+   */
+  @Nullable
+  public Map<String, String> getColumnNameMap(String rawTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive");
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNameMap : null;
+  }
+
+  /**
+   * Returns the table config for the given table, or {@code null} if it does not exist.
+   */
+  @Nullable
+  public TableConfig getTableConfig(String tableNameWithType) {
+    return _tableConfigMap.get(tableNameWithType);
+  }
+
+  /**
+   * Returns the schema for the given table, or {@code null} if it does not exist.
+   */
+  @Nullable
+  public Schema getSchema(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._schema : null;
+  }
+
+  private void addTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding table config for ZNRecord: {}", znRecord.getId(), e);
+        }
       }
-      return actualColumnName;
     }
-    return columnName;
   }
 
-  public TableConfig getTableConfig(String tableName) {
-    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  private void putTableConfig(ZNRecord znRecord)
+      throws IOException {
+    TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+    String tableNameWithType = tableConfig.getTableName();
+    _tableConfigMap.put(tableNameWithType, tableConfig);
+    if (_caseInsensitive) {
+      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+    }
   }
 
-  class TableConfigChangeListener implements IZkChildListener, IZkDataListener {
-
-    Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
-    Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-    Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any changes
-        _propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener);
-        _propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
-              String tableNameWithType = tableConfig.getTableName();
-              _tableConfigMap.put(tableNameWithType, tableConfig);
-              String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
-              //create case insensitive mapping
-              _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
-              _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-              //create case insensitive mapping between table name and schemaName
-              _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), rawTableName);
-              _table2SchemaConfigMap.put(rawTableName.toLowerCase(), rawTableName);
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading table config for: {}: {}", znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
+  private void removeTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+    String tableNameWithType = path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+    _tableConfigMap.remove(tableNameWithType);
+    if (_caseInsensitive) {
+      _tableNameMap.remove(tableNameWithType.toLowerCase());
+      String lowerCaseRawTableName = TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
+        }
+      } else {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading tableconfigs", e);
-        //ignore
       }
     }
+  }
+
+  private void addSchemas(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding schema for ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putSchema(ZNRecord znRecord)
+      throws IOException {
+    Schema schema = SchemaUtils.fromZNRecord(znRecord);
+    String rawTableName = schema.getSchemaName();
+    if (_caseInsensitive) {
+      Map<String, String> columnNameMap = new HashMap<>();
+      for (String columnName : schema.getColumnNames()) {
+        columnNameMap.put(columnName.toLowerCase(), columnName);
+      }
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+    } else {
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+    }
+  }
+
+  private void removeSchema(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(rawTableName);
+  }
+
+  private class TableConfigChangeListener implements IZkChildListener, IZkDataListener {
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleChildChange(String path, List<String> tables) {
+      if (CollectionUtils.isEmpty(tables)) {
+        return;
+      }
+
+      // Only process new added table configs. Changed/removed table configs are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String tableNameWithType : tables) {
+        if (!_tableConfigMap.containsKey(tableNameWithType)) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addTableConfigs(pathsToAdd);
+      }
     }
 
     @Override
-    public void handleDataChange(String s, Object o)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing table config for ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
     }
 
     @Override
-    public void handleDataDeleted(String s)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative path to the property store.
+      String tableNameWithType = path.substring(path.lastIndexOf('/') + 1);
+      removeTableConfig(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
     }
   }
 
-  class SchemaChangeListener implements IZkChildListener, IZkDataListener {
-    Map<String, Map<String, String>> _schemaColumnMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any changes between reading and setting the watcher again
-        _propertyStore.subscribeChildChanges(PROPERTYSTORE_SCHEMAS_PREFIX, _schemaChangeListener);
-        _propertyStore.subscribeDataChanges(PROPERTYSTORE_SCHEMAS_PREFIX, _schemaChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_SCHEMAS_PREFIX, null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              Schema schema = SchemaUtils.fromZNRecord(znRecord);
-              String schemaNameLowerCase = schema.getSchemaName().toLowerCase();
-              Collection<FieldSpec> allFieldSpecs = schema.getAllFieldSpecs();
-              ConcurrentHashMap<String, String> columnNameMap = new ConcurrentHashMap<>();
-              _schemaColumnMap.put(schemaNameLowerCase, columnNameMap);
-              for (FieldSpec fieldSpec : allFieldSpecs) {
-                columnNameMap.put(fieldSpec.getName().toLowerCase(), fieldSpec.getName());
-              }
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading schema for: {}: {}", znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading schemas", e);
-        //ignore
+  private class SchemaChangeListener implements IZkChildListener, IZkDataListener {
+
+    @Override
+    public synchronized void handleChildChange(String path, List<String> tables) {
+      if (CollectionUtils.isEmpty(tables)) {
+        return;
       }
-    }
 
-    String getColumnName(String schemaName, String columnName) {
-      Map<String, String> columnNameMap = _schemaColumnMap.get(schemaName.toLowerCase());
-      if (columnNameMap != null) {
-        return columnNameMap.get(columnName.toLowerCase());
+      // Only process new added schemas. Changed/removed schemas are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String rawTableName : tables) {
+        if (!_schemaInfoMap.containsKey(rawTableName)) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addSchemas(pathsToAdd);
       }
-      return columnName;
     }
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing schema for ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
     }
 
     @Override
-    public void handleDataChange(String s, Object o)
-        throws Exception {
-      refresh();
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative path to the property store.
+      String rawTableName = path.substring(path.lastIndexOf('/') + 1);
+      removeSchema(SCHEMA_PATH_PREFIX + rawTableName);
     }
+  }
 
-    @Override
-    public void handleDataDeleted(String s)
-        throws Exception {
-      refresh();
+  private static class SchemaInfo {
+    final Schema _schema;
+    final Map<String, String> _columnNameMap;
+
+    private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+      _schema = schema;
+      _columnNameMap = columnNameMap;
     }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7d5bd61..5cb528c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -27,6 +27,7 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -204,7 +205,15 @@ public class PinotHelixResourceManager {
     addInstanceGroupTagIfNeeded();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
-    _tableCache = new TableCache(_propertyStore);
+
+    // Initialize TableCache
+    HelixConfigScope helixConfigScope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build();
+    Map<String, String> configs = _helixAdmin.getConfig(helixConfigScope,
+        Arrays.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
+    boolean caseInsensitive = Boolean.parseBoolean(configs.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean
+        .parseBoolean(configs.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
+    _tableCache = new TableCache(_propertyStore, caseInsensitive);
   }
 
   /**
@@ -467,28 +476,23 @@ public class PinotHelixResourceManager {
    * @return tableName actually defined in Pinot (matches case) and exists ,else, return the input value
    */
   public String getActualTableName(String tableName) {
-    return _tableCache.getActualTableName(tableName);
-  }
-
-  /**
-   *  Given a column name in any case, returns the column name as defined in Schema
-   *  If table has no schema, it just returns the input value
-   * @param tableName
-   * @param columnName
-   * @return
-   */
-  public String getActualColumnName(String tableName, String columnName) {
-    return _tableCache.getActualColumnName(tableName, columnName);
+    if (_tableCache.isCaseInsensitive()) {
+      String actualTableName = _tableCache.getActualTableName(tableName);
+      return actualTableName != null ? actualTableName : tableName;
+    } else {
+      return tableName;
+    }
   }
 
   /**
-   * Given a table name in any case, returns crypter class name defined in table config
-   * @param tableName table name in any case
+   * Returns the crypter class name defined in the table config for the given table.
+   *
+   * @param tableNameWithType Table name with type suffix
    * @return crypter class name
    */
-  public String getCrypterClassNameFromTableConfig(String tableName) {
-    TableConfig tableConfig = _tableCache.getTableConfig(tableName);
-    Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableName);
+  public String getCrypterClassNameFromTableConfig(String tableNameWithType) {
+    TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
+    Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableNameWithType);
     return tableConfig.getValidationConfig().getCrypterClassName();
   }
 
@@ -1761,8 +1765,7 @@ public class PinotHelixResourceManager {
     propToUpdate.put(Helix.QUERY_RATE_LIMIT_DISABLED, Boolean.toString("DISABLE".equals(state)));
     HelixConfigScope scope =
         new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, _helixClusterName)
-            .forParticipant(brokerInstanceName)
-            .build();
+            .forParticipant(brokerInstanceName).build();
     _helixAdmin.setConfig(scope, propToUpdate);
   }
 
@@ -2264,10 +2267,10 @@ public class PinotHelixResourceManager {
           LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
 
           // Check that any segment from 'segmentsFrom' does not appear twice.
-          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
-              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
-                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
-                  lineageEntry.getSegmentsFrom(), segmentsFrom));
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String.format(
+              "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                  + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+              lineageEntry.getSegmentsFrom(), segmentsFrom));
 
           // Check that merged segments name cannot be the same.
           Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
new file mode 100644
index 0000000..9ba206d
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TableCacheTest extends ControllerTest {
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+  }
+
+  @Test
+  public void testTableCache()
+      throws Exception {
+    TableCache tableCache = new TableCache(_propertyStore, true);
+
+    assertTrue(tableCache.isCaseInsensitive());
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Add a table config
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    _helixResourceManager.addTable(tableConfig);
+    // Wait for at most 10 seconds for the callback to add the table config to the cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getTableConfig("testTable_OFFLINE") != null, 10_000L,
+        "Failed to add the table config to the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Update the table config
+    tableConfig.getIndexingConfig().setCreateInvertedIndexDuringSegmentGeneration(true);
+    _helixResourceManager.updateTableConfig(tableConfig);
+    // Wait for at most 10 seconds for the callback to update the table config in the cache
+    // NOTE: Table config should never be null during the transitioning
+    TestUtils.waitForCondition(
+        aVoid -> Preconditions.checkNotNull(tableCache.getTableConfig("testTable_OFFLINE")).equals(tableConfig),
+        10_000L, "Failed to update the table config in the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertNull(tableCache.getSchema("testTable"));
+
+    // Add a schema
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("testColumn", DataType.INT)
+            .build();
+    _helixResourceManager.addSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to add the schema to the cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") != null, 10_000L,
+        "Failed to add the schema to the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    assertEquals(tableCache.getColumnNameMap("testTable"), Collections.singletonMap("testcolumn", "testColumn"));
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Update the schema
+    schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
+    _helixResourceManager.updateSchema(schema, false);
+    // Wait for at most 10 seconds for the callback to update the schema in the cache
+    // NOTE: schema should never be null during the transitioning
+    TestUtils.waitForCondition(aVoid -> Preconditions.checkNotNull(tableCache.getSchema("testTable")).equals(schema),
+        10_000L, "Failed to update the schema in the cache");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE"), "testTable");
+    assertEquals(tableCache.getActualTableName("TeStTaBlE_oFfLiNe"), "testTable_OFFLINE");
+    assertNull(tableCache.getActualTableName("testTable_REALTIME"));
+    Map<String, String> expectedColumnMap = new HashMap<>();
+    expectedColumnMap.put("testcolumn", "testColumn");
+    expectedColumnMap.put("newcolumn", "newColumn");
+    assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap);
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Create a new case-sensitive TableCache which should load the existing table config and schema
+    TableCache caseSensitiveTableCache = new TableCache(_propertyStore, false);
+    assertFalse(caseSensitiveTableCache.isCaseInsensitive());
+    // Getting actual table name or column name map should throw exception for case-sensitive TableCache
+    try {
+      caseSensitiveTableCache.getActualTableName("testTable");
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+    try {
+      caseSensitiveTableCache.getColumnNameMap("testTable");
+      fail();
+    } catch (Exception e) {
+      // Expected
+    }
+    assertEquals(tableCache.getTableConfig("testTable_OFFLINE"), tableConfig);
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Remove the table config
+    _helixResourceManager.deleteOfflineTable("testTable");
+    // Wait for at most 10 seconds for the callback to remove the table config from the cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getTableConfig("testTable_OFFLINE") == null, 10_000L,
+        "Failed to remove the table config from the cache");
+    // Case-insensitive table name are handled based on the table config instead of the schema
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertEquals(tableCache.getColumnNameMap("testTable"), expectedColumnMap);
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertEquals(tableCache.getSchema("testTable"), schema);
+
+    // Remove the schema
+    _helixResourceManager.deleteSchema(schema);
+    // Wait for at most 10 seconds for the callback to remove the schema from the cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema("testTable") == null, 10_000L,
+        "Failed to remove the schema from the cache");
+    assertNull(tableCache.getActualTableName("testTable"));
+    assertNull(tableCache.getColumnNameMap("testTable"));
+    assertNull(tableCache.getTableConfig("testTable_OFFLINE"));
+    assertNull(tableCache.getSchema("testTable"));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopController();
+    stopZk();
+  }
+}
diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index ab433e7..9e4535e 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -92,6 +92,10 @@
       <artifactId>commons-lang3</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/pom.xml b/pom.xml
index c59189a..a3f23b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -402,11 +402,11 @@
         <artifactId>commons-lang3</artifactId>
         <version>3.5</version>
       </dependency>
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <version>3.2.1</version>
-    </dependency>
+      <dependency>
+        <groupId>commons-collections</groupId>
+        <artifactId>commons-collections</artifactId>
+        <version>3.2.1</version>
+      </dependency>
       <dependency>
         <groupId>commons-configuration</groupId>
         <artifactId>commons-configuration</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org