You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/11 00:41:32 UTC

[incubator-pinot] branch master updated: Remove the temporary code for the selection fix (#4696)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c37dc02  Remove the temporary code for the selection fix (#4696)
c37dc02 is described below

commit c37dc02c4259e7978feb1157fd96a169bff8109b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Oct 10 17:41:26 2019 -0700

    Remove the temporary code for the selection fix (#4696)
    
    Remove the temporary code introduced in the selection fix: #4629
---
 .../core/query/reduce/BrokerReduceService.java     | 189 +--------------------
 1 file changed, 9 insertions(+), 180 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 4bd5f14..a205a5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -19,12 +19,10 @@
 package org.apache.pinot.core.query.reduce;
 
 import com.google.common.base.Preconditions;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,8 +46,6 @@ import org.apache.pinot.common.request.HavingFilterQuery;
 import org.apache.pinot.common.request.HavingFilterQueryMap;
 import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.request.transform.TransformExpressionTree;
-import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.response.ServerInstance;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -76,6 +72,7 @@ import org.apache.pinot.core.util.GroupByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * The <code>BrokerReduceService</code> class provides service to reduce data tables gathered from multiple servers
  * to {@link BrokerResponseNative}.
@@ -231,12 +228,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
       // For empty data table map, construct empty result using the cached data schema for selection query if exists
       if (cachedDataSchema != null) {
         if (brokerRequest.isSetSelections()) {
-          List<String> selectionColumns =
-              SelectionOperatorUtils.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(),
-                  cachedDataSchema);
+          List<String> selectionColumns = SelectionOperatorUtils
+              .getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(), cachedDataSchema);
           brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
-        } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions)
-            && GroupByUtils.isResponseFormat(Request.SQL, queryOptions)) {
+        } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions) && GroupByUtils
+            .isResponseFormat(Request.SQL, queryOptions)) {
           setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
               brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
         }
@@ -248,17 +244,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
       if (selection != null) {
         // Selection query
 
-        // Temporary code to handle the selection query with different DataSchema returned from different servers
-        // TODO: Remove the code after all servers are migrated to the current version
-        DataSchema masterDataSchema = getCanonicalDataSchema(selection, cachedDataSchema);
-        String[] columnNames = masterDataSchema.getColumnNames();
-        for (Map.Entry<ServerInstance, DataTable> entry : dataTableMap.entrySet()) {
-          entry.setValue(new CanonicalDataTable(entry.getValue(), columnNames));
-        }
-
         // For data table map with more than one data tables, remove conflicting data tables
         if (dataTableMap.size() > 1) {
-          List<String> droppedServers = removeConflictingResponses(masterDataSchema, dataTableMap);
+          List<String> droppedServers = removeConflictingResponses(cachedDataSchema, dataTableMap);
           if (!droppedServers.isEmpty()) {
             String errorMessage =
                 QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
@@ -267,12 +255,12 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
             if (brokerMetrics != null) {
               brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
             }
-            brokerResponseNative.addToExceptions(
-                new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
+            brokerResponseNative
+                .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
           }
         }
 
-        setSelectionResults(brokerResponseNative, selection, dataTableMap, masterDataSchema, preserveType);
+        setSelectionResults(brokerResponseNative, selection, dataTableMap, cachedDataSchema, preserveType);
       } else {
         // Aggregation query
 
@@ -805,163 +793,4 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
           "There should be minimum one aggregation function in the select list of a Group by query");
     }
   }
-
-  /**
-   * Following part are temporary code to handle the selection query with different DataSchema returned from different
-   * servers.
-   * TODO: Remove the code after all servers are migrated to the current version.
-   */
-
-  private static DataSchema getCanonicalDataSchema(Selection selection, DataSchema dataSchema) {
-    String[] columnNames = dataSchema.getColumnNames();
-    Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(columnNames);
-    int numColumns = columnToIndexMap.size();
-
-    Set<String> canonicalColumnSet = new HashSet<>();
-    List<String> canonicalColumns = new ArrayList<>(numColumns);
-
-    // Put order-by columns at the front
-    List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
-    if (sortSequence != null) {
-      for (SelectionSort selectionSort : sortSequence) {
-        String orderByColumn = selectionSort.getColumn();
-        if (canonicalColumnSet.add(orderByColumn)) {
-          canonicalColumns.add(orderByColumn);
-        }
-      }
-    }
-
-    List<String> selectionColumns = selection.getSelectionColumns();
-    if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
-      selectionColumns = new ArrayList<>(numColumns);
-      for (String column : columnToIndexMap.keySet()) {
-        if (TransformExpressionTree.compileToExpressionTree(column).getExpressionType()
-            == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-          selectionColumns.add(column);
-        }
-      }
-      selectionColumns.sort(null);
-    }
-
-    for (String selectionColumn : selectionColumns) {
-      if (canonicalColumnSet.add(selectionColumn)) {
-        canonicalColumns.add(selectionColumn);
-      }
-    }
-
-    int numCanonicalColumns = canonicalColumns.size();
-    String[] canonicalColumnNames = new String[numCanonicalColumns];
-    DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
-    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    for (int i = 0; i < numCanonicalColumns; i++) {
-      String canonicalColumn = canonicalColumns.get(i);
-      canonicalColumnNames[i] = canonicalColumn;
-      canonicalColumnDataTypes[i] = columnDataTypes[columnToIndexMap.get(canonicalColumn)];
-    }
-    return new DataSchema(canonicalColumnNames, canonicalColumnDataTypes);
-  }
-
-  private static class CanonicalDataTable implements DataTable {
-    final DataTable _dataTable;
-    final int[] _indexMap;
-    final DataSchema _canonicalDataSchema;
-
-    CanonicalDataTable(DataTable dataTable, String[] canonicalColumns) {
-      _dataTable = dataTable;
-      int numCanonicalColumns = canonicalColumns.length;
-      _indexMap = new int[numCanonicalColumns];
-      DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
-
-      DataSchema dataSchema = dataTable.getDataSchema();
-      Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(dataSchema.getColumnNames());
-      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-      for (int i = 0; i < numCanonicalColumns; i++) {
-        int columnIndex = columnToIndexMap.get(canonicalColumns[i]);
-        _indexMap[i] = columnIndex;
-        canonicalColumnDataTypes[i] = columnDataTypes[columnIndex];
-      }
-      _canonicalDataSchema = new DataSchema(canonicalColumns, canonicalColumnDataTypes);
-    }
-
-    @Override
-    public void addException(ProcessingException processingException) {
-      _dataTable.addException(processingException);
-    }
-
-    @Override
-    public byte[] toBytes()
-        throws IOException {
-      return _dataTable.toBytes();
-    }
-
-    @Override
-    public Map<String, String> getMetadata() {
-      return _dataTable.getMetadata();
-    }
-
-    @Override
-    public DataSchema getDataSchema() {
-      return _canonicalDataSchema;
-    }
-
-    @Override
-    public int getNumberOfRows() {
-      return _dataTable.getNumberOfRows();
-    }
-
-    @Override
-    public int getInt(int rowId, int colId) {
-      return _dataTable.getInt(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public long getLong(int rowId, int colId) {
-      return _dataTable.getLong(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public float getFloat(int rowId, int colId) {
-      return _dataTable.getFloat(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public double getDouble(int rowId, int colId) {
-      return _dataTable.getDouble(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public String getString(int rowId, int colId) {
-      return _dataTable.getString(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public <T> T getObject(int rowId, int colId) {
-      return _dataTable.getObject(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public int[] getIntArray(int rowId, int colId) {
-      return _dataTable.getIntArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public long[] getLongArray(int rowId, int colId) {
-      return _dataTable.getLongArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public float[] getFloatArray(int rowId, int colId) {
-      return _dataTable.getFloatArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public double[] getDoubleArray(int rowId, int colId) {
-      return _dataTable.getDoubleArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public String[] getStringArray(int rowId, int colId) {
-      return _dataTable.getStringArray(rowId, _indexMap[colId]);
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org