You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/25 21:57:16 UTC

[incubator-pinot] branch cache-table-schemas-in-broker updated: Extract similar methods into one method

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch cache-table-schemas-in-broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/cache-table-schemas-in-broker by this push:
     new ae03d65  Extract similar methods into one method
ae03d65 is described below

commit ae03d6570f864d98d663b23ed7b51e26fe9d4470
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jan 25 13:56:58 2019 -0800

    Extract similar methods into one method
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 60 ++--------------
 .../pinot/common/utils/request/RequestInfo.java    | 80 +++++++++++++++++++++
 .../pinot/common/utils/request/RequestUtils.java   | 73 +++++++++++++++++++
 .../core/query/request/ServerQueryRequest.java     | 83 +++-------------------
 4 files changed, 171 insertions(+), 125 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 465c6cb..e106baf 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,21 +44,16 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerQueryPhase;
-import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
-import org.apache.pinot.common.request.GroupBy;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestInfo;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.slf4j.Logger;
@@ -350,8 +344,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       String tableName = brokerRequest.getQuerySource().getTableName();
       Schema schema = _tableSchemaCache.getIfTableSchemaPresent(tableName);
       if (schema != null) {
-        Set<String> columnsFromBrokerRequest = getAllColumnsFromBrokerRequest(brokerRequest, filterQueryTree);
-        // Filters out virtual columns in the query.
+        RequestInfo requestInfo = RequestUtils.preComputeRequestInfo(brokerRequest);
+        // gets all the columns from broker request
+        Set<String> columnsFromBrokerRequest = requestInfo.getAllColumns();
+        // filters out virtual columns in the query.
         columnsFromBrokerRequest.removeIf(column -> column.startsWith("$"));
         columnsFromBrokerRequest.removeAll(schema.getColumnNames());
         if (!columnsFromBrokerRequest.isEmpty()) {
@@ -359,6 +355,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
           throw new RuntimeException(
               "Found non-existent columns from the query: " + columnsFromBrokerRequest.toString());
         }
+        // reduces the time of fetching filter query tree, which'd be used in optimization part.
+        filterQueryTree[0] = requestInfo.getFilterQueryTree();
       } else {
         // If the cache doesn't have the schema, loads the schema to the cache asynchronously.
         _tableSchemaCache.refreshTableSchema(tableName);
@@ -367,50 +365,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Helper to get all the columns from broker request.
-   * Returns the set of all the columns.
-   */
-  private Set<String> getAllColumnsFromBrokerRequest(BrokerRequest brokerRequest, FilterQueryTree[] filterQueryTree) {
-    Set<String> allColumns = new HashSet<>();
-    // Filter
-    filterQueryTree[0] = RequestUtils.generateFilterQueryTree(brokerRequest);
-    if (filterQueryTree[0] != null) {
-      allColumns.addAll(RequestUtils.extractFilterColumns(filterQueryTree[0]));
-    }
-
-    // Aggregation
-    List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
-    if (aggregationsInfo != null) {
-      Set<TransformExpressionTree> _aggregationExpressions = new HashSet<>();
-      for (AggregationInfo aggregationInfo : aggregationsInfo) {
-        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
-          _aggregationExpressions.add(
-              TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
-        }
-      }
-      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(_aggregationExpressions));
-    }
-
-    // Group-by
-    GroupBy groupBy = brokerRequest.getGroupBy();
-    if (groupBy != null) {
-      Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
-      for (String expression : groupBy.getExpressions()) {
-        groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
-      }
-      allColumns.addAll(RequestUtils.extractColumnsFromExpressions(groupByExpressions));
-    }
-
-    // Selection
-    Selection selection = brokerRequest.getSelections();
-    if (selection != null) {
-      allColumns.addAll(RequestUtils.extractSelectionColumns(selection));
-    }
-
-    return allColumns;
-  }
-
-  /**
    * Helper method to get the time column name for the OFFLINE table name from the time boundary service, or
    * <code>null</code> if the time boundary service does not have the information.
    */
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestInfo.java
new file mode 100644
index 0000000..41257f6
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestInfo.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.request;
+
+import java.util.Set;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+
+
+public class RequestInfo {
+  // Pre-computed segment independent information
+  private final Set<String> _allColumns;
+  private final FilterQueryTree _filterQueryTree;
+  private final Set<String> _filterColumns;
+  private final Set<TransformExpressionTree> _aggregationExpressions;
+  private final Set<String> _aggregationColumns;
+  private final Set<TransformExpressionTree> _groupByExpressions;
+  private final Set<String> _groupByColumns;
+  private final Set<String> _selectionColumns;
+
+  public RequestInfo(Set<String> allColumns, FilterQueryTree filterQueryTree, Set<String> filterColumns,
+      Set<TransformExpressionTree> aggregationExpressions, Set<String> aggregationColumns,
+      Set<TransformExpressionTree> groupByExpressions, Set<String> groupByColumns, Set<String> selectionColumns) {
+    _allColumns = allColumns;
+    _filterQueryTree = filterQueryTree;
+    _filterColumns = filterColumns;
+    _aggregationExpressions = aggregationExpressions;
+    _aggregationColumns = aggregationColumns;
+    _groupByExpressions = groupByExpressions;
+    _groupByColumns = groupByColumns;
+    _selectionColumns = selectionColumns;
+  }
+
+  public Set<String> getAllColumns() {
+    return _allColumns;
+  }
+
+  public FilterQueryTree getFilterQueryTree() {
+    return _filterQueryTree;
+  }
+
+  public Set<String> getFilterColumns() {
+    return _filterColumns;
+  }
+
+  public Set<TransformExpressionTree> getAggregationExpressions() {
+    return _aggregationExpressions;
+  }
+
+  public Set<String> getAggregationColumns() {
+    return _aggregationColumns;
+  }
+
+  public Set<TransformExpressionTree> getGroupByExpressions() {
+    return _groupByExpressions;
+  }
+
+  public Set<String> getGroupByColumns() {
+    return _groupByColumns;
+  }
+
+  public Set<String> getSelectionColumns() {
+    return _selectionColumns;
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index f914714..8641faf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -26,9 +26,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
+import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.request.HavingFilterQuery;
 import org.apache.pinot.common.request.HavingFilterQueryMap;
 import org.apache.pinot.common.request.Selection;
@@ -37,6 +39,9 @@ import org.apache.pinot.common.request.transform.TransformExpressionTree;
 
 
 public class RequestUtils {
+  private static final String COLUMN_KEY = "column";
+  private static final String COUNT = "count";
+
   private RequestUtils() {
   }
 
@@ -204,4 +209,72 @@ public class RequestUtils {
     }
     return selectionColumns;
   }
+
+  /**
+   * Extracts all the information from the request
+   * @param brokerRequest broker request
+   * @return RequestInfo
+   */
+  public static RequestInfo preComputeRequestInfo(BrokerRequest brokerRequest) {
+    Set<String> allColumns = new HashSet<>();
+    FilterQueryTree filterQueryTree;
+    Set<String> filterColumns;
+    Set<TransformExpressionTree> aggregationExpressions;
+    Set<String> aggregationColumns;
+    Set<TransformExpressionTree> groupByExpressions;
+    Set<String> groupByColumns;
+    Set<String> selectionColumns;
+
+    // Filter
+    filterQueryTree = generateFilterQueryTree(brokerRequest);
+    if (filterQueryTree != null) {
+      filterColumns = extractFilterColumns(filterQueryTree);
+      allColumns.addAll(filterColumns);
+    } else {
+      filterColumns = null;
+    }
+
+    // Aggregation
+    List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
+    if (aggregationsInfo != null) {
+      aggregationExpressions = new HashSet<>();
+      for (AggregationInfo aggregationInfo : aggregationsInfo) {
+        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(COUNT)) {
+          aggregationExpressions.add(
+              TransformExpressionTree.compileToExpressionTree(aggregationInfo.getAggregationParams().get(COLUMN_KEY)));
+        }
+      }
+      aggregationColumns = extractColumnsFromExpressions(aggregationExpressions);
+      allColumns.addAll(aggregationColumns);
+    } else {
+      aggregationExpressions = null;
+      aggregationColumns = null;
+    }
+
+    // Group-by
+    GroupBy groupBy = brokerRequest.getGroupBy();
+    if (groupBy != null) {
+      groupByExpressions = new HashSet<>();
+      for (String expression : groupBy.getExpressions()) {
+        groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      groupByColumns = extractColumnsFromExpressions(groupByExpressions);
+      allColumns.addAll(groupByColumns);
+    } else {
+      groupByExpressions = null;
+      groupByColumns = null;
+    }
+
+    // Selection
+    Selection selection = brokerRequest.getSelections();
+    if (selection != null) {
+      selectionColumns = extractSelectionColumns(selection);
+      allColumns.addAll(selectionColumns);
+    } else {
+      selectionColumns = null;
+    }
+
+    return new RequestInfo(allColumns, filterQueryTree, filterColumns, aggregationExpressions, aggregationColumns,
+        groupByExpressions, groupByColumns, selectionColumns);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 5c4bb95..837b8b6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -18,21 +18,16 @@
  */
 package org.apache.pinot.core.query.request;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestInfo;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.TimerContext;
 
 
@@ -54,14 +49,7 @@ public class ServerQueryRequest {
   private final TimerContext _timerContext;
 
   // Pre-computed segment independent information
-  private final Set<String> _allColumns;
-  private final FilterQueryTree _filterQueryTree;
-  private final Set<String> _filterColumns;
-  private final Set<TransformExpressionTree> _aggregationExpressions;
-  private final Set<String> _aggregationColumns;
-  private final Set<TransformExpressionTree> _groupByExpressions;
-  private final Set<String> _groupByColumns;
-  private final Set<String> _selectionColumns;
+  private final RequestInfo _requestInfo;
 
   // Query processing context
   private volatile int _segmentCountAfterPruning = -1;
@@ -76,56 +64,7 @@ public class ServerQueryRequest {
     _timerContext = new TimerContext(_tableNameWithType, serverMetrics, queryArrivalTimeMs);
 
     // Pre-compute segment independent information
-    _allColumns = new HashSet<>();
-
-    // Filter
-    _filterQueryTree = RequestUtils.generateFilterQueryTree(_brokerRequest);
-    if (_filterQueryTree != null) {
-      _filterColumns = RequestUtils.extractFilterColumns(_filterQueryTree);
-      _allColumns.addAll(_filterColumns);
-    } else {
-      _filterColumns = null;
-    }
-
-    // Aggregation
-    List<AggregationInfo> aggregationsInfo = _brokerRequest.getAggregationsInfo();
-    if (aggregationsInfo != null) {
-      _aggregationExpressions = new HashSet<>();
-      for (AggregationInfo aggregationInfo : aggregationsInfo) {
-        if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
-          _aggregationExpressions.add(
-              TransformExpressionTree.compileToExpressionTree(AggregationFunctionUtils.getColumn(aggregationInfo)));
-        }
-      }
-      _aggregationColumns = RequestUtils.extractColumnsFromExpressions(_aggregationExpressions);
-      _allColumns.addAll(_aggregationColumns);
-    } else {
-      _aggregationExpressions = null;
-      _aggregationColumns = null;
-    }
-
-    // Group-by
-    GroupBy groupBy = _brokerRequest.getGroupBy();
-    if (groupBy != null) {
-      _groupByExpressions = new HashSet<>();
-      for (String expression : groupBy.getExpressions()) {
-        _groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
-      }
-      _groupByColumns = RequestUtils.extractColumnsFromExpressions(_groupByExpressions);
-      _allColumns.addAll(_groupByColumns);
-    } else {
-      _groupByExpressions = null;
-      _groupByColumns = null;
-    }
-
-    // Selection
-    Selection selection = _brokerRequest.getSelections();
-    if (selection != null) {
-      _selectionColumns = RequestUtils.extractSelectionColumns(selection);
-      _allColumns.addAll(_selectionColumns);
-    } else {
-      _selectionColumns = null;
-    }
+    _requestInfo = RequestUtils.preComputeRequestInfo(_brokerRequest);
   }
 
   public long getRequestId() {
@@ -157,41 +96,41 @@ public class ServerQueryRequest {
   }
 
   public Set<String> getAllColumns() {
-    return _allColumns;
+    return _requestInfo.getAllColumns();
   }
 
   @Nullable
   public FilterQueryTree getFilterQueryTree() {
-    return _filterQueryTree;
+    return _requestInfo.getFilterQueryTree();
   }
 
   @Nullable
   public Set<String> getFilterColumns() {
-    return _filterColumns;
+    return _requestInfo.getFilterColumns();
   }
 
   @Nullable
   public Set<TransformExpressionTree> getAggregationExpressions() {
-    return _aggregationExpressions;
+    return _requestInfo.getAggregationExpressions();
   }
 
   @Nullable
   public Set<String> getAggregationColumns() {
-    return _aggregationColumns;
+    return _requestInfo.getAggregationColumns();
   }
 
   @Nullable
   public Set<TransformExpressionTree> getGroupByExpressions() {
-    return _groupByExpressions;
+    return _requestInfo.getGroupByExpressions();
   }
 
   @Nullable
   public Set<String> getGroupByColumns() {
-    return _groupByColumns;
+    return _requestInfo.getGroupByColumns();
   }
 
   @Nullable
   public Set<String> getSelectionColumns() {
-    return _selectionColumns;
+    return _requestInfo.getSelectionColumns();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org