You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2018/11/29 23:43:22 UTC
[incubator-pinot] 01/01: Add guava cache to cache table schema in
pinot broker;
Check non-exsitence columns in broker. Emit a metric if column mismatched.
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 33a1db46cd5b1eee30084b921c33b0f8e579342d
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Nov 29 15:26:36 2018 -0800
Add guava cache to cache table schema in pinot broker;
Check non-exsitence columns in broker.
Emit a metric if column mismatched.
---
.../pinot/broker/broker/BrokerServerBuilder.java | 11 ++-
.../broker/broker/helix/HelixBrokerStarter.java | 3 +-
.../requesthandler/BaseBrokerRequestHandler.java | 82 +++++++++++++++++++++-
.../ConnectionPoolBrokerRequestHandler.java | 10 ++-
.../SingleConnectionBrokerRequestHandler.java | 8 ++-
.../broker/requesthandler/TableSchemaCache.java | 67 ++++++++++++++++++
.../linkedin/pinot/common/metrics/BrokerMeter.java | 4 +-
7 files changed, 174 insertions(+), 11 deletions(-)
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
index c1e2d59..71ee2f8 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
@@ -30,6 +30,8 @@ import com.linkedin.pinot.common.utils.CommonConstants;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@ public class BrokerServerBuilder {
private final TimeBoundaryService _timeBoundaryService;
private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
private final TableQueryQuotaManager _tableQueryQuotaManager;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final AccessControlFactory _accessControlFactory;
private final MetricsRegistry _metricsRegistry;
private final BrokerMetrics _brokerMetrics;
@@ -70,7 +73,8 @@ public class BrokerServerBuilder {
private final BrokerAdminApiApplication _brokerAdminApplication;
public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
- LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+ LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
_state.set(State.INIT);
_config = config;
_delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -78,6 +82,7 @@ public class BrokerServerBuilder {
_timeBoundaryService = timeBoundaryService;
_liveInstanceChangeListener = liveInstanceChangeListener;
_tableQueryQuotaManager = tableQueryQuotaManager;
+ _propertyStore = propertyStore;
_accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
_metricsRegistry = new MetricsRegistry();
MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
@@ -93,11 +98,11 @@ public class BrokerServerBuilder {
if (requestHandlerType.equalsIgnoreCase(SINGLE_CONNECTION_REQUEST_HANDLER_TYPE)) {
LOGGER.info("Using SingleConnectionBrokerRequestHandler");
return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
- _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+ _accessControlFactory, _tableQueryQuotaManager, _propertyStore, _brokerMetrics);
} else {
LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
- _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+ _tableQueryQuotaManager, _propertyStore, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
}
}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
index ac7e6de..fa81795 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -198,7 +198,8 @@ public class HelixBrokerStarter {
config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
}
BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
- _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+ _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager,
+ _propertyStore);
_accessControlFactory = brokerServerBuilder.getAccessControlFactory();
_helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
_tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 014842c..9cb5c33 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -23,30 +23,43 @@ import com.linkedin.pinot.broker.routing.RoutingTable;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
import com.linkedin.pinot.broker.routing.TimeBoundaryService;
import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.exception.QueryException;
import com.linkedin.pinot.common.metrics.BrokerMeter;
import com.linkedin.pinot.common.metrics.BrokerMetrics;
import com.linkedin.pinot.common.metrics.BrokerQueryPhase;
+import com.linkedin.pinot.common.request.AggregationInfo;
import com.linkedin.pinot.common.request.BrokerRequest;
import com.linkedin.pinot.common.request.FilterOperator;
import com.linkedin.pinot.common.request.FilterQuery;
import com.linkedin.pinot.common.request.FilterQueryMap;
+import com.linkedin.pinot.common.request.GroupBy;
+import com.linkedin.pinot.common.request.Selection;
+import com.linkedin.pinot.common.request.transform.TransformExpressionTree;
import com.linkedin.pinot.common.response.BrokerResponse;
import com.linkedin.pinot.common.response.broker.BrokerResponseNative;
import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.request.FilterQueryTree;
+import com.linkedin.pinot.common.utils.request.RequestUtils;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionType;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import com.linkedin.pinot.core.query.reduce.BrokerReduceService;
import com.linkedin.pinot.pql.parsers.Pql2Compiler;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,11 +78,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final TimeBoundaryService _timeBoundaryService;
protected final AccessControlFactory _accessControlFactory;
protected final TableQueryQuotaManager _tableQueryQuotaManager;
+ protected final ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected final BrokerMetrics _brokerMetrics;
protected final AtomicLong _requestIdGenerator = new AtomicLong();
protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
protected final BrokerReduceService _brokerReduceService = new BrokerReduceService();
+ protected final TableSchemaCache _tableSchemaCache;
protected final String _brokerId;
protected final long _brokerTimeoutMs;
@@ -78,13 +93,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+ TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ BrokerMetrics brokerMetrics) {
_config = config;
_routingTable = routingTable;
_timeBoundaryService = timeBoundaryService;
_accessControlFactory = accessControlFactory;
_tableQueryQuotaManager = tableQueryQuotaManager;
+ _propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
+ _tableSchemaCache = new TableSchemaCache(_propertyStore);
_brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
_brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
@@ -311,6 +329,68 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
"Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + _queryResponseLimit);
}
}
+
+ // Checks whether the query contains non-existence columns.
+ // Table name has already been verified before hitting this line.
+ String tableName = brokerRequest.getQuerySource().getTableName();
+ Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
+ if (schema != null) {
+ Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest);
+ Set<String> copied = new HashSet<>(allColumns);
+ copied.removeAll(schema.getColumnNames());
+ if (!copied.isEmpty()) {
+ _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.QUERY_NON_EXISTENCE_COLUMNS, 1L);
+ throw new RuntimeException("Found non-existence columns from the query: " + copied.toString());
+ }
+ } else {
+ // If the cache doesn't have the schema, loads the schema to the cache asynchronously.
+ _tableSchemaCache.refreshTableSchema(tableName);
+ }
+ }
+
+ /**
+ * Helper to get all the columns from broker request.
+ * Returns the set of all the columns.
+ */
+ private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest) {
+ Set<String> allColumns = new HashSet<>();
+ // Filter
+ FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
+ if (filterQueryTree != null) {
+ allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree));
+ }
+
+ // Aggregation
+ List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
+ if (aggregationsInfo != null) {
+ Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
+ for (AggregationInfo aggregationInfo : aggregationsInfo) {
+ if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
+ _aggregationExpressions.add(
+ TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
+ }
+ }
+ allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
+ }
+
+ // Group-by
+ GroupBy groupBy = brokerRequest.getGroupBy();
+ if (groupBy != null) {
+ Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+ for (String expression : groupBy.getExpressions()) {
+ groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+ }
+ allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
+ }
+
+
+ // Selection
+ Selection selection = brokerRequest.getSelections();
+ if (selection != null) {
+ allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
+ }
+
+ return allColumns;
}
/**
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index b17eb42..09aa125 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,9 +87,11 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
- LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
- super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+ TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ BrokerMetrics brokerMetrics, LiveInstancesChangeListenerImpl liveInstanceChangeListener,
+ MetricsRegistry metricsRegistry) {
+ super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+ brokerMetrics);
_liveInstanceChangeListener = liveInstanceChangeListener;
TransportClientConf transportClientConf = new TransportClientConf();
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 762b724..3389bcb 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
/**
@@ -51,8 +53,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
- super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+ TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ BrokerMetrics brokerMetrics) {
+ super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+ brokerMetrics);
_queryRouter = new QueryRouter(_brokerId, brokerMetrics);
}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
new file mode 100644
index 0000000..6c8c58c
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.requesthandler;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+public class TableSchemaCache {
+ private static final long DEFAULT_CACHE_SIZE = 50;
+ private static final long DEFAULT_CACHE_TIMEOUT_IN_MINUTE = 60;
+
+ private final LoadingCache<String, Schema> _tableSchemaCache;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+ TableSchemaCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _propertyStore = propertyStore;
+ _tableSchemaCache = CacheBuilder.newBuilder()
+ .maximumSize(DEFAULT_CACHE_SIZE)
+ .expireAfterWrite(DEFAULT_CACHE_TIMEOUT_IN_MINUTE, TimeUnit.MINUTES)
+ .build(new CacheLoader<String, Schema>() {
+ @Override
+ public Schema load(String rawTableName) {
+ return ZKMetadataProvider.getTableSchema(_propertyStore, rawTableName);
+ }
+ });
+
+ }
+
+ /**
+ * Refreshes table schema.
+ * @param tableName Table name with or without type suffix.
+ */
+ public void refreshTableSchema(String tableName) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ _tableSchemaCache.refresh(rawTableName);
+ }
+
+ /**
+ * Gets table schema if it's present.
+ * @param tableName Table name with or without type suffix.
+ */
+ public Schema getIfTableSchemaPresent(String tableName) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ return _tableSchemaCache.getIfPresent(rawTableName);
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
index 78b9264..18dfdf6 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/BrokerMeter.java
@@ -94,7 +94,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// Netty connection metrics
NETTY_CONNECTION_REQUESTS_SENT("nettyConnection", true),
NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
- NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true);
+ NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+
+ QUERY_NON_EXISTENCE_COLUMNS("queries", false);
private final String brokerMeterName;
private final String unit;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org