You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2018/11/29 23:29:56 UTC

[incubator-pinot] branch cache-table-schemas-in-broker updated (496d147 -> d01c795)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 496d147  Add guava cache to cache table schema in pinot broker
     new d01c795  Add guava cache to cache table schema in pinot broker

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (496d147)
            \
             N -- N -- N   refs/heads/cache-table-schemas-in-broker (d01c795)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../com/linkedin/pinot/broker/api/resources/PinotClientRequest.java     | 2 --
 1 file changed, 2 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add guava cache to cache table schema in pinot broker

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d01c7959c0a7cce1dc86c28c7e4d5403a0bd7152
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Nov 29 15:26:36 2018 -0800

    Add guava cache to cache table schema in pinot broker
---
 .../pinot/broker/broker/BrokerServerBuilder.java   | 11 ++-
 .../broker/broker/helix/HelixBrokerStarter.java    |  3 +-
 .../requesthandler/BaseBrokerRequestHandler.java   | 83 +++++++++++++++++++++-
 .../ConnectionPoolBrokerRequestHandler.java        | 10 ++-
 .../SingleConnectionBrokerRequestHandler.java      |  8 ++-
 .../broker/requesthandler/TableSchemaCache.java    | 67 +++++++++++++++++
 6 files changed, 172 insertions(+), 10 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
index c1e2d59..71ee2f8 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/BrokerServerBuilder.java
@@ -30,6 +30,8 @@ import com.linkedin.pinot.common.utils.CommonConstants;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,7 @@ public class BrokerServerBuilder {
   private final TimeBoundaryService _timeBoundaryService;
   private final LiveInstancesChangeListenerImpl _liveInstanceChangeListener;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
   private final BrokerMetrics _brokerMetrics;
@@ -70,7 +73,8 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager) {
+      LiveInstancesChangeListenerImpl liveInstanceChangeListener, TableQueryQuotaManager tableQueryQuotaManager,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
     _state.set(State.INIT);
     _config = config;
     _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -78,6 +82,7 @@ public class BrokerServerBuilder {
     _timeBoundaryService = timeBoundaryService;
     _liveInstanceChangeListener = liveInstanceChangeListener;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
     _metricsRegistry = new MetricsRegistry();
     MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
@@ -93,11 +98,11 @@ public class BrokerServerBuilder {
     if (requestHandlerType.equalsIgnoreCase(SINGLE_CONNECTION_REQUEST_HANDLER_TYPE)) {
       LOGGER.info("Using SingleConnectionBrokerRequestHandler");
       return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
-          _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+          _accessControlFactory, _tableQueryQuotaManager, _propertyStore, _brokerMetrics);
     } else {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
+          _tableQueryQuotaManager, _propertyStore, _brokerMetrics, _liveInstanceChangeListener, _metricsRegistry);
     }
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
index ac7e6de..fa81795 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -198,7 +198,8 @@ public class HelixBrokerStarter {
       config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
     }
     BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager);
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstancesListener, _tableQueryQuotaManager,
+        _propertyStore);
     _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
     _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 014842c..37a8d3d 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -23,30 +23,46 @@ import com.linkedin.pinot.broker.routing.RoutingTable;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.broker.routing.TimeBoundaryService;
 import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.exception.QueryException;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.metrics.BrokerQueryPhase;
+import com.linkedin.pinot.common.request.AggregationInfo;
 import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.common.request.FilterOperator;
 import com.linkedin.pinot.common.request.FilterQuery;
 import com.linkedin.pinot.common.request.FilterQueryMap;
+import com.linkedin.pinot.common.request.GroupBy;
+import com.linkedin.pinot.common.request.Selection;
+import com.linkedin.pinot.common.request.transform.TransformExpressionTree;
 import com.linkedin.pinot.common.response.BrokerResponse;
 import com.linkedin.pinot.common.response.broker.BrokerResponseNative;
 import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.request.FilterQueryTree;
+import com.linkedin.pinot.common.utils.request.RequestUtils;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionType;
+import com.linkedin.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import com.linkedin.pinot.core.query.reduce.BrokerReduceService;
 import com.linkedin.pinot.pql.parsers.Pql2Compiler;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,11 +81,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final TimeBoundaryService _timeBoundaryService;
   protected final AccessControlFactory _accessControlFactory;
   protected final TableQueryQuotaManager _tableQueryQuotaManager;
+  protected final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
   protected final BrokerReduceService _brokerReduceService = new BrokerReduceService();
+  protected final TableSchemaCache _tableSchemaCache;
 
   protected final String _brokerId;
   protected final long _brokerTimeoutMs;
@@ -78,13 +96,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
     _tableQueryQuotaManager = tableQueryQuotaManager;
+    _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
+    _tableSchemaCache = new TableSchemaCache(_propertyStore);
 
     _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
@@ -311,6 +332,66 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
             "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + _queryResponseLimit);
       }
     }
+
+    // Checks whether the query contains non-existence columns.
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
+    if (schema != null) {
+      Set<String> allColumns = getAllColumnsFromBrokerRequest(brokerRequest);
+      Set<String> copied = new HashSet<>(allColumns);
+      copied.removeAll(schema.getColumnNames());
+      if (!copied.isEmpty()) {
+        throw new RuntimeException("Found non-existence columns from the query: " + copied.toString());
+      }
+    } else {
+      // If the cache doesn't have the schema, loads the schema to the cache asynchronously.
+      _tableSchemaCache.refreshTableSchema(tableName);
+    }
+  }
+
+  /**
+   * Helper to get all the columns from broker request.
+   * Returns the set of all the columns.
+   */
+  private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest) {
+    Set<String> allColumns = new HashSet<>();
+    // Filter
+    FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
+    if (filterQueryTree != null) {
+      allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree));
+    }
+
+    // Aggregation
+    List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
+    if (aggregationsInfo != null) {
+      Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
+      for (AggregationInfo aggregationInfo : aggregationsInfo) {
+        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
+          _aggregationExpressions.add(
+              TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
+        }
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
+    }
+
+    // Group-by
+    GroupBy groupBy = brokerRequest.getGroupBy();
+    if (groupBy != null) {
+      Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+      for (String expression : groupBy.getExpressions()) {
+        groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
+    }
+
+
+    // Selection
+    Selection selection = brokerRequest.getSelections();
+    if (selection != null) {
+      allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
+    }
+
+    return allColumns;
   }
 
   /**
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index b17eb42..09aa125 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,9 +87,11 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
 
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstancesChangeListenerImpl liveInstanceChangeListener, MetricsRegistry metricsRegistry) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics, LiveInstancesChangeListenerImpl liveInstanceChangeListener,
+      MetricsRegistry metricsRegistry) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _liveInstanceChangeListener = liveInstanceChangeListener;
 
     TransportClientConf transportClientConf = new TransportClientConf();
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 762b724..3389bcb 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,8 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
 
 /**
@@ -51,8 +53,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      TableQueryQuotaManager tableQueryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, propertyStore,
+        brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
new file mode 100644
index 0000000..6c8c58c
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/TableSchemaCache.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.requesthandler;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+public class TableSchemaCache {
+  private static final long DEFAULT_CACHE_SIZE = 50;
+  private static final long DEFAULT_CACHE_TIMEOUT_IN_MINUTE = 60;
+
+  private final LoadingCache<String, Schema> _tableSchemaCache;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  TableSchemaCache(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+    _tableSchemaCache = CacheBuilder.newBuilder()
+        .maximumSize(DEFAULT_CACHE_SIZE)
+        .expireAfterWrite(DEFAULT_CACHE_TIMEOUT_IN_MINUTE, TimeUnit.MINUTES)
+        .build(new CacheLoader<String, Schema>() {
+          @Override
+          public Schema load(String rawTableName) {
+            return ZKMetadataProvider.getTableSchema(_propertyStore, rawTableName);
+          }
+        });
+
+  }
+
+  /**
+   * Refreshes table schema.
+   * @param tableName Table name with or without type suffix.
+   */
+  public void refreshTableSchema(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    _tableSchemaCache.refresh(rawTableName);
+  }
+
+  /**
+   * Gets table schema if it's present.
+   * @param tableName Table name with or without type suffix.
+   */
+  public Schema getIfTableSchemaPresent(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    return _tableSchemaCache.getIfPresent(rawTableName);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org