You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/11 00:41:32 UTC
[incubator-pinot] branch master updated: Remove the temporary code
for the selection fix (#4696)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c37dc02 Remove the temporary code for the selection fix (#4696)
c37dc02 is described below
commit c37dc02c4259e7978feb1157fd96a169bff8109b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Oct 10 17:41:26 2019 -0700
Remove the temporary code for the selection fix (#4696)
Remove the temporary code introduced in the selection fix: #4629
---
.../core/query/reduce/BrokerReduceService.java | 189 +--------------------
1 file changed, 9 insertions(+), 180 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 4bd5f14..a205a5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -19,12 +19,10 @@
package org.apache.pinot.core.query.reduce;
import com.google.common.base.Preconditions;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -48,8 +46,6 @@ import org.apache.pinot.common.request.HavingFilterQuery;
import org.apache.pinot.common.request.HavingFilterQueryMap;
import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.request.transform.TransformExpressionTree;
-import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.ServerInstance;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -76,6 +72,7 @@ import org.apache.pinot.core.util.GroupByUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* The <code>BrokerReduceService</code> class provides service to reduce data tables gathered from multiple servers
* to {@link BrokerResponseNative}.
@@ -231,12 +228,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
// For empty data table map, construct empty result using the cached data schema for selection query if exists
if (cachedDataSchema != null) {
if (brokerRequest.isSetSelections()) {
- List<String> selectionColumns =
- SelectionOperatorUtils.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(),
- cachedDataSchema);
+ List<String> selectionColumns = SelectionOperatorUtils
+ .getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(), cachedDataSchema);
brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
- } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions)
- && GroupByUtils.isResponseFormat(Request.SQL, queryOptions)) {
+ } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions) && GroupByUtils
+ .isResponseFormat(Request.SQL, queryOptions)) {
setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
}
@@ -248,17 +244,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
if (selection != null) {
// Selection query
- // Temporary code to handle the selection query with different DataSchema returned from different servers
- // TODO: Remove the code after all servers are migrated to the current version
- DataSchema masterDataSchema = getCanonicalDataSchema(selection, cachedDataSchema);
- String[] columnNames = masterDataSchema.getColumnNames();
- for (Map.Entry<ServerInstance, DataTable> entry : dataTableMap.entrySet()) {
- entry.setValue(new CanonicalDataTable(entry.getValue(), columnNames));
- }
-
// For data table map with more than one data tables, remove conflicting data tables
if (dataTableMap.size() > 1) {
- List<String> droppedServers = removeConflictingResponses(masterDataSchema, dataTableMap);
+ List<String> droppedServers = removeConflictingResponses(cachedDataSchema, dataTableMap);
if (!droppedServers.isEmpty()) {
String errorMessage =
QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
@@ -267,12 +255,12 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
}
- brokerResponseNative.addToExceptions(
- new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
+ brokerResponseNative
+ .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
}
}
- setSelectionResults(brokerResponseNative, selection, dataTableMap, masterDataSchema, preserveType);
+ setSelectionResults(brokerResponseNative, selection, dataTableMap, cachedDataSchema, preserveType);
} else {
// Aggregation query
@@ -805,163 +793,4 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
"There should be minimum one aggregation function in the select list of a Group by query");
}
}
-
- /**
- * Following part are temporary code to handle the selection query with different DataSchema returned from different
- * servers.
- * TODO: Remove the code after all servers are migrated to the current version.
- */
-
- private static DataSchema getCanonicalDataSchema(Selection selection, DataSchema dataSchema) {
- String[] columnNames = dataSchema.getColumnNames();
- Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(columnNames);
- int numColumns = columnToIndexMap.size();
-
- Set<String> canonicalColumnSet = new HashSet<>();
- List<String> canonicalColumns = new ArrayList<>(numColumns);
-
- // Put order-by columns at the front
- List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
- if (sortSequence != null) {
- for (SelectionSort selectionSort : sortSequence) {
- String orderByColumn = selectionSort.getColumn();
- if (canonicalColumnSet.add(orderByColumn)) {
- canonicalColumns.add(orderByColumn);
- }
- }
- }
-
- List<String> selectionColumns = selection.getSelectionColumns();
- if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
- selectionColumns = new ArrayList<>(numColumns);
- for (String column : columnToIndexMap.keySet()) {
- if (TransformExpressionTree.compileToExpressionTree(column).getExpressionType()
- == TransformExpressionTree.ExpressionType.IDENTIFIER) {
- selectionColumns.add(column);
- }
- }
- selectionColumns.sort(null);
- }
-
- for (String selectionColumn : selectionColumns) {
- if (canonicalColumnSet.add(selectionColumn)) {
- canonicalColumns.add(selectionColumn);
- }
- }
-
- int numCanonicalColumns = canonicalColumns.size();
- String[] canonicalColumnNames = new String[numCanonicalColumns];
- DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
- DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- for (int i = 0; i < numCanonicalColumns; i++) {
- String canonicalColumn = canonicalColumns.get(i);
- canonicalColumnNames[i] = canonicalColumn;
- canonicalColumnDataTypes[i] = columnDataTypes[columnToIndexMap.get(canonicalColumn)];
- }
- return new DataSchema(canonicalColumnNames, canonicalColumnDataTypes);
- }
-
- private static class CanonicalDataTable implements DataTable {
- final DataTable _dataTable;
- final int[] _indexMap;
- final DataSchema _canonicalDataSchema;
-
- CanonicalDataTable(DataTable dataTable, String[] canonicalColumns) {
- _dataTable = dataTable;
- int numCanonicalColumns = canonicalColumns.length;
- _indexMap = new int[numCanonicalColumns];
- DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
-
- DataSchema dataSchema = dataTable.getDataSchema();
- Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(dataSchema.getColumnNames());
- DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- for (int i = 0; i < numCanonicalColumns; i++) {
- int columnIndex = columnToIndexMap.get(canonicalColumns[i]);
- _indexMap[i] = columnIndex;
- canonicalColumnDataTypes[i] = columnDataTypes[columnIndex];
- }
- _canonicalDataSchema = new DataSchema(canonicalColumns, canonicalColumnDataTypes);
- }
-
- @Override
- public void addException(ProcessingException processingException) {
- _dataTable.addException(processingException);
- }
-
- @Override
- public byte[] toBytes()
- throws IOException {
- return _dataTable.toBytes();
- }
-
- @Override
- public Map<String, String> getMetadata() {
- return _dataTable.getMetadata();
- }
-
- @Override
- public DataSchema getDataSchema() {
- return _canonicalDataSchema;
- }
-
- @Override
- public int getNumberOfRows() {
- return _dataTable.getNumberOfRows();
- }
-
- @Override
- public int getInt(int rowId, int colId) {
- return _dataTable.getInt(rowId, _indexMap[colId]);
- }
-
- @Override
- public long getLong(int rowId, int colId) {
- return _dataTable.getLong(rowId, _indexMap[colId]);
- }
-
- @Override
- public float getFloat(int rowId, int colId) {
- return _dataTable.getFloat(rowId, _indexMap[colId]);
- }
-
- @Override
- public double getDouble(int rowId, int colId) {
- return _dataTable.getDouble(rowId, _indexMap[colId]);
- }
-
- @Override
- public String getString(int rowId, int colId) {
- return _dataTable.getString(rowId, _indexMap[colId]);
- }
-
- @Override
- public <T> T getObject(int rowId, int colId) {
- return _dataTable.getObject(rowId, _indexMap[colId]);
- }
-
- @Override
- public int[] getIntArray(int rowId, int colId) {
- return _dataTable.getIntArray(rowId, _indexMap[colId]);
- }
-
- @Override
- public long[] getLongArray(int rowId, int colId) {
- return _dataTable.getLongArray(rowId, _indexMap[colId]);
- }
-
- @Override
- public float[] getFloatArray(int rowId, int colId) {
- return _dataTable.getFloatArray(rowId, _indexMap[colId]);
- }
-
- @Override
- public double[] getDoubleArray(int rowId, int colId) {
- return _dataTable.getDoubleArray(rowId, _indexMap[colId]);
- }
-
- @Override
- public String[] getStringArray(int rowId, int colId) {
- return _dataTable.getStringArray(rowId, _indexMap[colId]);
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org