You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2018/11/29 23:43:22 UTC

[incubator-pinot] 01/01: Add guava cache to cache table schema in pinot broker; Check non-exsitence columns in broker. Emit a metric if column mismatched.

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 33a1db46cd5b1eee30084b921c33b0f8e579342d
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Nov 29 15:26:36 2018 -0800

    Add guava cache to cache table schema in pinot broker;
    Check non-exsitence columns in broker.
    Emit a metric if column mismatched.
---
 .../pinot/broker/broker/BrokerServerBuilder.java   | 11 ++-
 .../broker/broker/helix/HelixBrokerStarter.java    |  3 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 82 +++++++++++++++++++++-
 .../ConnectionPoolBrokerRequestHandler.java        | 10 ++-
 .../SingleConnectionBrokerRequestHandler.java      |  8 ++-
 .../broker/requesthandler/TableSchemaCache.java    | 67 ++++++++++++++++++
 .../linkedin/pinot/common/metrics/BrokerMeter.java |  4 +-
 7 files changed, 174 insertions(+), 11 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
index c1e2d59..71ee2f8 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
@@ -30,6 +30,8 @@ import com.linkedin.pinot.common.utils.CommonConstants;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,7 @@ public class BrokerServerBuilder {
   private final TimeBoundaryService _timeBoundaryService;
   private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
   private final BrokerMetrics _brokerMetrics;
@@ -70,7 +73,8 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
     _state.set(State.INIT);
     _config = config;
     _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -78,6 +82,7 @@ public class BrokerServerBuilder {
     _timeBoundaryService = timeBoundaryService;
     _liveInstanceChangeListener = liveInstanceChangeListener;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
     _metricsRegistry = new MetricsRegistry();
     MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
@@ -93,11 +98,11 @@ public class BrokerServerBuilder {
     if (requestHandlerType.equalsIgnoreCase(SINGLE_CONNECTION_REQUEST_HANDLER_TYPE)) {
       LOGGER.info("Using SingleConnectionBrokerRequestHandler");
       return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
-          _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+          _accessControlFactory, _tableQueryQuotaManager, _propertyStore, _brokerMetrics);
     } else {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+          _tableQueryQuotaManager, _propertyStore, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
     }
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
index ac7e6de..fa81795 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -198,7 +198,8 @@ public class HelixBrokerStarter {
       config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
     }
     BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager,
+        _propertyStore);
     _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
     _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 014842c..9cb5c33 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -23,30 +23,43 @@ import com.linkedin.pinot.broker.routing.RoutingTable;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.broker.routing.TimeBoundaryService;
 import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.exception.QueryException;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.metrics.BrokerQueryPhase;
+import com.linkedin.pinot.common.request.AggregationInfo;
 import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.common.request.FilterOperator;
 import com.linkedin.pinot.common.request.FilterQuery;
 import com.linkedin.pinot.common.request.FilterQueryMap;
+import com.linkedin.pinot.common.request.GroupBy;
+import com.linkedin.pinot.common.request.Selection;
+import com.linkedin.pinot.common.request.transform.TransformExpressionTree;
 import com.linkedin.pinot.common.response.BrokerResponse;
 import com.linkedin.pinot.common.response.broker.BrokerResponseNative;
 import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.request.FilterQueryTree;
+import com.linkedin.pinot.common.utils.request.RequestUtils;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionType;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import com.linkedin.pinot.core.query.reduce.BrokerReduceService;
 import com.linkedin.pinot.pql.parsers.Pql2Compiler;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,11 +78,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final TimeBoundaryService _timeBoundaryService;
   protected final AccessControlFactory _accessControlFactory;
   protected final TableQueryQuotaManager _tableQueryQuotaManager;
+  protected final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
   protected final BrokerReduceService _brokerReduceService = new BrokerReduceService();
+  protected final TableSchemaCache _tableSchemaCache;
 
   protected final String _brokerId;
   protected final long _brokerTimeoutMs;
@@ -78,13 +93,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
+    _tableSchemaCache = new TableSchemaCache(_propertyStore);
 
     _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
@@ -311,6 +329,68 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
             "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + _queryResponseLimit);
       }
     }
+
+    // Checks whether the query contains non-existence columns.
+    // Table name has already been verified before hitting this line.
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
+    if (schema != null) {
+      Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest);
+      Set<String> copied = new HashSet<>(allColumns);
+      copied.removeAll(schema.getColumnNames());
+      if (!copied.isEmpty()) {
+        _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.QUERY_NON_EXISTENCE_COLUMNS, 1L);
+        throw new RuntimeException("Found non-existence columns from the query: " + copied.toString());
+      }
+    } else {
+      // If the cache doesn't have the schema, loads the schema to the cache asynchronously.
+      _tableSchemaCache.refreshTableSchema(tableName);
+    }
+  }
+
+  /**
+   * Helper to get all the columns from broker request.
+   * Returns the set of all the columns.
+   */
+  private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest) {
+    Set<String> allColumns = new HashSet<>();
+    // Filter
+    FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
+    if (filterQueryTree != null) {
+      allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree));
+    }
+
+    // Aggregation
+    List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
+    if (aggregationsInfo != null) {
+      Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
+      for (AggregationInfo aggregationInfo : aggregationsInfo) {
+        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
+          _aggregationExpressions.add(
+              TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
+        }
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
+    }
+
+    // Group-by
+    GroupBy groupBy = brokerRequest.getGroupBy();
+    if (groupBy != null) {
+      Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+      for (String expression : groupBy.getExpressions()) {
+        groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
+    }
+
+
+    // Selection
+    Selection selection = brokerRequest.getSelections();
+    if (selection != null) {
+      allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
+    }
+
+    return allColumns;
   }
 
   /**
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index b17eb42..09aa125 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,9 +87,11 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
 
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics, LiveInstancesChangeListenerImpl liveInstanceChangeListener,
+      MetricsRegistry metricsRegistry) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _liveInstanceChangeListener = liveInstanceChangeListener;
 
     TransportClientConf transportClientConf = new TransportClientConf();
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 762b724..3389bcb 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
 
 /**
@@ -51,8 +53,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
new file mode 100644
index 0000000..6c8c58c
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.requesthandler;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+public class TableSchemaCache {
+  private static final long DEFAULT_CACHE_SIZE = 50;
+  private static final long DEFAULT_CACHE_TIMEOUT_IN_MINUTE = 60;
+
+  private final LoadingCache<String, Schema> _tableSchemaCache;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  TableSchemaCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+    _tableSchemaCache = CacheBuilder.newBuilder()
+        .maximumSize(DEFAULT_CACHE_SIZE)
+        .expireAfterWrite(DEFAULT_CACHE_TIMEOUT_IN_MINUTE, TimeUnit.MINUTES)
+        .build(new CacheLoader<String, Schema>() {
+          @Override
+          public Schema load(String rawTableName) {
+            return ZKMetadataProvider.getTableSchema(_propertyStore, rawTableName);
+          }
+        });
+
+  }
+
+  /**
+   * Refreshes table schema.
+   * @param tableName Table name with or without type suffix.
+   */
+  public void refreshTableSchema(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    _tableSchemaCache.refresh(rawTableName);
+  }
+
+  /**
+   * Gets table schema if it's present.
+   * @param tableName Table name with or without type suffix.
+   */
+  public Schema getIfTableSchemaPresent(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    return _tableSchemaCache.getIfPresent(rawTableName);
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
index 78b9264..18dfdf6 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
@@ -94,7 +94,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   // Netty connection metrics
   NETTY_CONNECTION_REQUESTS_SENT("nettyConnection", true),
   NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
-  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true);
+  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+
+  QUERY_NON_EXISTENCE_COLUMNS("queries", false);
 
   private final String brokerMeterName;
   private final String unit;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org