You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/31 19:54:37 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5780: TableCache Enhancement

Jackie-Jiang commented on a change in pull request #5780:
URL: https://github.com/apache/incubator-pinot/pull/5780#discussion_r463803114



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
##########
@@ -18,190 +18,308 @@
  */
 package org.apache.pinot.common.utils.helix;
 
-import java.util.Collection;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- *  Caches table config and schema of a table.
- *  At the start - loads all the table configs and schemas in map.
- *  sets up a zookeeper listener that watches for any change and updates the cache.
- *  TODO: optimize to load only changed table configs/schema on a callback.
- *  TODO: Table deletes are not handled as of now
- *  Goal is to eventually grow this into a PinotClusterDataAccessor
+ * The {@code TableCache} caches all the table configs and schemas within the cluster, and listens on ZK changes to keep
+ * them in sync. It also maintains the table name map and the column name map for case-insensitive queries.
  */
 public class TableCache {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableCache.class);
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
+  private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = "_offline";
+  private static final String LOWER_CASE_REALTIME_TABLE_SUFFIX = "_realtime";
 
-  private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
-  private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final boolean _caseInsensitive;
+  // For case insensitive, key is lower case table name, value is actual table name
+  private final Map<String, String> _tableNameMap;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  TableConfigChangeListener _tableConfigChangeListener;
-  SchemaChangeListener _schemaChangeListener;
+  // Key is table name with type suffix
+  private final TableConfigChangeListener _tableConfigChangeListener = new TableConfigChangeListener();
+  private final Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
+  // Key is raw table name
+  private final SchemaChangeListener _schemaChangeListener = new SchemaChangeListener();
+  private final Map<String, SchemaInfo> _schemaInfoMap = new ConcurrentHashMap<>();
 
-  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean caseInsensitive) {
     _propertyStore = propertyStore;
-    _schemaChangeListener = new SchemaChangeListener();
-    _schemaChangeListener.refresh();
-    _tableConfigChangeListener = new TableConfigChangeListener();
-    _tableConfigChangeListener.refresh();
+    _caseInsensitive = caseInsensitive;
+    _tableNameMap = caseInsensitive ? new ConcurrentHashMap<>() : null;
+
+    synchronized (_tableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing changes
+      _propertyStore.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, _tableConfigChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String tableNameWithType : tables) {
+          pathsToAdd.add(TABLE_CONFIG_PATH_PREFIX + tableNameWithType);
+        }
+        addTableConfigs(pathsToAdd);
+      }
+    }
+
+    synchronized (_schemaChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing changes
+      _propertyStore.subscribeChildChanges(SCHEMA_PARENT_PATH, _schemaChangeListener);
+
+      List<String> tables = _propertyStore.getChildNames(SCHEMA_PARENT_PATH, AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = new ArrayList<>(tables.size());
+        for (String rawTableName : tables) {
+          pathsToAdd.add(SCHEMA_PATH_PREFIX + rawTableName);
+        }
+        addSchemas(pathsToAdd);
+      }
+    }
+
+    LOGGER.info("Initialized TableCache with caseInsensitive: {}", caseInsensitive);
   }
 
-  public String getActualTableName(String tableName) {
-    return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName);
+  /**
+   * Returns {@code true} if the TableCache is case-insensitive, {@code false} otherwise.
+   */
+  public boolean isCaseInsensitive() {
+    return _caseInsensitive;
   }
 
-  public boolean containsTable(String tableName) {
-    return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+  /**
+   * For case-insensitive only, returns the actual table name for the given case-insensitive table name (with or without
+   * type suffix), or {@code null} if the table does not exist.
+   */
+  @Nullable
+  public String getActualTableName(String caseInsensitiveTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive");
+    return _tableNameMap.get(caseInsensitiveTableName.toLowerCase());
   }
 
-  public String getActualColumnName(String tableName, String columnName) {
-    String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
-    if (schemaName != null) {
-      String actualColumnName = _schemaChangeListener.getColumnName(schemaName, columnName);
-      // If actual column name doesn't exist in schema, then return the origin column name.
-      if (actualColumnName == null) {
-        return columnName;
+  /**
+   * For case-insensitive only, returns a map from lower case column name to actual column name for the given table, or
+   * {@code null} if the table schema does not exist.
+   */
+  @Nullable
+  public Map<String, String> getColumnNameMap(String rawTableName) {
+    Preconditions.checkState(_caseInsensitive, "TableCache is not case-insensitive");
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNameMap : null;
+  }
+
+  /**
+   * Returns the table config for the given table, or {@code null} if it does not exist.
+   */
+  @Nullable
+  public TableConfig getTableConfig(String tableNameWithType) {
+    return _tableConfigMap.get(tableNameWithType);
+  }
+
+  /**
+   * Returns the schema for the given table, or {@code null} if it does not exist.
+   */
+  @Nullable
+  public Schema getSchema(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._schema : null;
+  }
+
+  private void addTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _tableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding table config for ZNRecord: {}", znRecord.getId(), e);
+        }
       }
-      return actualColumnName;
     }
-    return columnName;
   }
 
-  public TableConfig getTableConfig(String tableName) {
-    return _tableConfigChangeListener._tableConfigMap.get(tableName);
+  private void putTableConfig(ZNRecord znRecord)
+      throws IOException {
+    TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+    String tableNameWithType = tableConfig.getTableName();
+    _tableConfigMap.put(tableNameWithType, tableConfig);
+    if (_caseInsensitive) {
+      _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
+      String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+      _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
+    }
   }
 
-  class TableConfigChangeListener implements IZkChildListener, IZkDataListener {
-
-    Map<String, TableConfig> _tableConfigMap = new ConcurrentHashMap<>();
-    Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
-    Map<String, String> _table2SchemaConfigMap = new ConcurrentHashMap<>();
-
-    public synchronized void refresh() {
-      try {
-        //always subscribe first before reading, so that we dont miss any changes
-        _propertyStore.subscribeChildChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener);
-        _propertyStore.subscribeDataChanges(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, _tableConfigChangeListener);
-        List<ZNRecord> children =
-            _propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT);
-        if (children != null) {
-          for (ZNRecord znRecord : children) {
-            try {
-              TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
-              String tableNameWithType = tableConfig.getTableName();
-              _tableConfigMap.put(tableNameWithType, tableConfig);
-              String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
-              //create case insensitive mapping
-              _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
-              _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
-              //create case insensitive mapping between table name and schemaName
-              _table2SchemaConfigMap.put(tableNameWithType.toLowerCase(), rawTableName);
-              _table2SchemaConfigMap.put(rawTableName.toLowerCase(), rawTableName);
-            } catch (Exception e) {
-              LOGGER.warn("Exception loading table config for: {}: {}", znRecord.getId(), e.getMessage());
-              //ignore
-            }
-          }
+  private void removeTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _tableConfigChangeListener);
+    String tableNameWithType = path.substring(TABLE_CONFIG_PATH_PREFIX.length());
+    _tableConfigMap.remove(tableNameWithType);
+    if (_caseInsensitive) {
+      _tableNameMap.remove(tableNameWithType.toLowerCase());
+      String lowerCaseRawTableName = TableNameBuilder.extractRawTableName(tableNameWithType).toLowerCase();
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_REALTIME_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
+        }
+      } else {
+        if (!_tableNameMap.containsKey(lowerCaseRawTableName + LOWER_CASE_OFFLINE_TABLE_SUFFIX)) {
+          _tableNameMap.remove(lowerCaseRawTableName);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Exception subscribing/reading tableconfigs", e);
-        //ignore
       }
     }
+  }
+
+  private void addSchemas(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, _schemaChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putSchema(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding schema for ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
+  private void putSchema(ZNRecord znRecord)
+      throws IOException {
+    Schema schema = SchemaUtils.fromZNRecord(znRecord);
+    String rawTableName = schema.getSchemaName();
+    if (_caseInsensitive) {
+      Map<String, String> columnNameMap = new HashMap<>();
+      for (String columnName : schema.getColumnNames()) {
+        columnNameMap.put(columnName.toLowerCase(), columnName);
+      }
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+    } else {
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+    }
+  }
+
+  private void removeSchema(String path) {
+    _propertyStore.unsubscribeDataChanges(path, _schemaChangeListener);
+    String rawTableName = path.substring(SCHEMA_PATH_PREFIX.length());
+    _schemaInfoMap.remove(rawTableName);
+  }
+
+  private class TableConfigChangeListener implements IZkChildListener, IZkDataListener {
 
     @Override
-    public void handleChildChange(String s, List<String> list)
-        throws Exception {
-      refresh();
+    public synchronized void handleChildChange(String path, List<String> tables) {

Review comment:
       We won't because internally it resubscribe the watches before reading the child names




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org