You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2018/11/29 23:26:58 UTC

[incubator-pinot] branch cache-table-schemas-in-broker created (now 496d147)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 496d147  Add guava cache to cache table schema in pinot broker

This branch includes the following new commits:

     new 496d147  Add guava cache to cache table schema in pinot broker

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add guava cache to cache table schema in pinot broker

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 496d147c557022a3cc991c6942e00f503d3e05f0
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Nov 29 15:26:36 2018 -0800

    Add guava cache to cache table schema in pinot broker
---
 .../broker/api/resources/PinotClientRequest.java   |  2 +
 .../pinot/broker/broker/BrokerServerBuilder.java   | 11 ++-
 .../broker/broker/helix/HelixBrokerStarter.java    |  3 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 83 +++++++++++++++++++++-
 .../ConnectionPoolBrokerRequestHandler.java        | 10 ++-
 .../SingleConnectionBrokerRequestHandler.java      |  8 ++-
 .../broker/requesthandler/TableSchemaCache.java    | 67 +++++++++++++++++
 7 files changed, 174 insertions(+), 10 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
index 5b2bc6e..4b9b1e0 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
@@ -33,6 +33,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
index c1e2d59..71ee2f8 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
@@ -30,6 +30,8 @@ import com.linkedin.pinot.common.utils.CommonConstants;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,7 @@ public class BrokerServerBuilder {
   private final TimeBoundaryService _timeBoundaryService;
   private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
   private final BrokerMetrics _brokerMetrics;
@@ -70,7 +73,8 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
     _state.set(State.INIT);
     _config = config;
     _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -78,6 +82,7 @@ public class BrokerServerBuilder {
     _timeBoundaryService = timeBoundaryService;
     _liveInstanceChangeListener = liveInstanceChangeListener;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
     _metricsRegistry = new MetricsRegistry();
     MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
@@ -93,11 +98,11 @@ public class BrokerServerBuilder {
     if (requestHandlerType.equalsIgnoreCase(SINGLE_CONNECTION_REQUEST_HANDLER_TYPE)) {
       LOGGER.info("Using SingleConnectionBrokerRequestHandler");
       return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
-          _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+          _accessControlFactory, _tableQueryQuotaManager, _propertyStore, _brokerMetrics);
     } else {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+          _tableQueryQuotaManager, _propertyStore, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
     }
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
index ac7e6de..fa81795 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -198,7 +198,8 @@ public class HelixBrokerStarter {
       config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
     }
     BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager,
+        _propertyStore);
     _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
     _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 014842c..37a8d3d 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -23,30 +23,46 @@ import com.linkedin.pinot.broker.routing.RoutingTable;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.broker.routing.TimeBoundaryService;
 import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.exception.QueryException;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.metrics.BrokerQueryPhase;
+import com.linkedin.pinot.common.request.AggregationInfo;
 import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.common.request.FilterOperator;
 import com.linkedin.pinot.common.request.FilterQuery;
 import com.linkedin.pinot.common.request.FilterQueryMap;
+import com.linkedin.pinot.common.request.GroupBy;
+import com.linkedin.pinot.common.request.Selection;
+import com.linkedin.pinot.common.request.transform.TransformExpressionTree;
 import com.linkedin.pinot.common.response.BrokerResponse;
 import com.linkedin.pinot.common.response.broker.BrokerResponseNative;
 import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.request.FilterQueryTree;
+import com.linkedin.pinot.common.utils.request.RequestUtils;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionType;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import com.linkedin.pinot.core.query.reduce.BrokerReduceService;
 import com.linkedin.pinot.pql.parsers.Pql2Compiler;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,11 +81,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final TimeBoundaryService _timeBoundaryService;
   protected final AccessControlFactory _accessControlFactory;
   protected final TableQueryQuotaManager _tableQueryQuotaManager;
+  protected final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
   protected final BrokerReduceService _brokerReduceService = new BrokerReduceService();
+  protected final TableSchemaCache _tableSchemaCache;
 
   protected final String _brokerId;
   protected final long _brokerTimeoutMs;
@@ -78,13 +96,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
+    _tableSchemaCache = new TableSchemaCache(_propertyStore);
 
     _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
@@ -311,6 +332,66 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
             "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + _queryResponseLimit);
       }
     }
+
+    // Checks whether the query contains non-existence columns.
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
+    if (schema != null) {
+      Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest);
+      Set<String> copied = new HashSet<>(allColumns);
+      copied.removeAll(schema.getColumnNames());
+      if (!copied.isEmpty()) {
+        throw new RuntimeException("Found non-existence columns from the query: " + copied.toString());
+      }
+    } else {
+      // If the cache doesn't have the schema, loads the schema to the cache asynchronously.
+      _tableSchemaCache.refreshTableSchema(tableName);
+    }
+  }
+
+  /**
+   * Helper to get all the columns from broker request.
+   * Returns the set of all the columns.
+   */
+  private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest) {
+    Set<String> allColumns = new HashSet<>();
+    // Filter
+    FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
+    if (filterQueryTree != null) {
+      allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree));
+    }
+
+    // Aggregation
+    List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
+    if (aggregationsInfo != null) {
+      Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
+      for (AggregationInfo aggregationInfo : aggregationsInfo) {
+        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
+          _aggregationExpressions.add(
+              TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
+        }
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
+    }
+
+    // Group-by
+    GroupBy groupBy = brokerRequest.getGroupBy();
+    if (groupBy != null) {
+      Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+      for (String expression : groupBy.getExpressions()) {
+        groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
+    }
+
+
+    // Selection
+    Selection selection = brokerRequest.getSelections();
+    if (selection != null) {
+      allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
+    }
+
+    return allColumns;
   }
 
   /**
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index b17eb42..09aa125 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,9 +87,11 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
 
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics, LiveInstancesChangeListenerImpl liveInstanceChangeListener,
+      MetricsRegistry metricsRegistry) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _liveInstanceChangeListener = liveInstanceChangeListener;
 
     TransportClientConf transportClientConf = new TransportClientConf();
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 762b724..3389bcb 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
 
 /**
@@ -51,8 +53,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
new file mode 100644
index 0000000..6c8c58c
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.requesthandler;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+public class TableSchemaCache {
+  private static final long DEFAULT_CACHE_SIZE = 50;
+  private static final long DEFAULT_CACHE_TIMEOUT_IN_MINUTE = 60;
+
+  private final LoadingCache<String, Schema> _tableSchemaCache;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  TableSchemaCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+    _tableSchemaCache = CacheBuilder.newBuilder()
+        .maximumSize(DEFAULT_CACHE_SIZE)
+        .expireAfterWrite(DEFAULT_CACHE_TIMEOUT_IN_MINUTE, TimeUnit.MINUTES)
+        .build(new CacheLoader<String, Schema>() {
+          @Override
+          public Schema load(String rawTableName) {
+            return ZKMetadataProvider.getTableSchema(_propertyStore, rawTableName);
+          }
+        });
+
+  }
+
+  /**
+   * Refreshes table schema.
+   * @param tableName Table name with or without type suffix.
+   */
+  public void refreshTableSchema(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    _tableSchemaCache.refresh(rawTableName);
+  }
+
+  /**
+   * Gets table schema if it's present.
+   * @param tableName Table name with or without type suffix.
+   */
+  public Schema getIfTableSchemaPresent(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    return _tableSchemaCache.getIfPresent(rawTableName);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org