You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/10 21:05:12 UTC

[incubator-pinot] branch remove_selection_fix_temporary_code created (now cf15849)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch remove_selection_fix_temporary_code
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at cf15849  Remove the temporary code for the selection fix

This branch includes the following new commits:

     new cf15849  Remove the temporary code for the selection fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Remove the temporary code for the selection fix

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch remove_selection_fix_temporary_code
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cf158497787aba94d741869d7676b069c7add2bc
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Oct 10 14:03:21 2019 -0700

    Remove the temporary code for the selection fix
    
    Remove the temporary code introduced in the selection fix: #4629
---
 .../core/query/reduce/BrokerReduceService.java     | 189 +--------------------
 1 file changed, 9 insertions(+), 180 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 4bd5f14..a205a5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -19,12 +19,10 @@
 package org.apache.pinot.core.query.reduce;
 
 import com.google.common.base.Preconditions;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,8 +46,6 @@ import org.apache.pinot.common.request.HavingFilterQuery;
 import org.apache.pinot.common.request.HavingFilterQueryMap;
 import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.request.transform.TransformExpressionTree;
-import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.response.ServerInstance;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -76,6 +72,7 @@ import org.apache.pinot.core.util.GroupByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * The <code>BrokerReduceService</code> class provides service to reduce data tables gathered from multiple servers
  * to {@link BrokerResponseNative}.
@@ -231,12 +228,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
       // For empty data table map, construct empty result using the cached data schema for selection query if exists
       if (cachedDataSchema != null) {
         if (brokerRequest.isSetSelections()) {
-          List<String> selectionColumns =
-              SelectionOperatorUtils.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(),
-                  cachedDataSchema);
+          List<String> selectionColumns = SelectionOperatorUtils
+              .getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(), cachedDataSchema);
           brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
-        } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions)
-            && GroupByUtils.isResponseFormat(Request.SQL, queryOptions)) {
+        } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions) && GroupByUtils
+            .isResponseFormat(Request.SQL, queryOptions)) {
           setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
               brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
         }
@@ -248,17 +244,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
       if (selection != null) {
         // Selection query
 
-        // Temporary code to handle the selection query with different DataSchema returned from different servers
-        // TODO: Remove the code after all servers are migrated to the current version
-        DataSchema masterDataSchema = getCanonicalDataSchema(selection, cachedDataSchema);
-        String[] columnNames = masterDataSchema.getColumnNames();
-        for (Map.Entry<ServerInstance, DataTable> entry : dataTableMap.entrySet()) {
-          entry.setValue(new CanonicalDataTable(entry.getValue(), columnNames));
-        }
-
         // For data table map with more than one data tables, remove conflicting data tables
         if (dataTableMap.size() > 1) {
-          List<String> droppedServers = removeConflictingResponses(masterDataSchema, dataTableMap);
+          List<String> droppedServers = removeConflictingResponses(cachedDataSchema, dataTableMap);
           if (!droppedServers.isEmpty()) {
             String errorMessage =
                 QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
@@ -267,12 +255,12 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
             if (brokerMetrics != null) {
               brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
             }
-            brokerResponseNative.addToExceptions(
-                new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
+            brokerResponseNative
+                .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
           }
         }
 
-        setSelectionResults(brokerResponseNative, selection, dataTableMap, masterDataSchema, preserveType);
+        setSelectionResults(brokerResponseNative, selection, dataTableMap, cachedDataSchema, preserveType);
       } else {
         // Aggregation query
 
@@ -805,163 +793,4 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
           "There should be minimum one aggregation function in the select list of a Group by query");
     }
   }
-
-  /**
-   * Following part are temporary code to handle the selection query with different DataSchema returned from different
-   * servers.
-   * TODO: Remove the code after all servers are migrated to the current version.
-   */
-
-  private static DataSchema getCanonicalDataSchema(Selection selection, DataSchema dataSchema) {
-    String[] columnNames = dataSchema.getColumnNames();
-    Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(columnNames);
-    int numColumns = columnToIndexMap.size();
-
-    Set<String> canonicalColumnSet = new HashSet<>();
-    List<String> canonicalColumns = new ArrayList<>(numColumns);
-
-    // Put order-by columns at the front
-    List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
-    if (sortSequence != null) {
-      for (SelectionSort selectionSort : sortSequence) {
-        String orderByColumn = selectionSort.getColumn();
-        if (canonicalColumnSet.add(orderByColumn)) {
-          canonicalColumns.add(orderByColumn);
-        }
-      }
-    }
-
-    List<String> selectionColumns = selection.getSelectionColumns();
-    if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
-      selectionColumns = new ArrayList<>(numColumns);
-      for (String column : columnToIndexMap.keySet()) {
-        if (TransformExpressionTree.compileToExpressionTree(column).getExpressionType()
-            == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-          selectionColumns.add(column);
-        }
-      }
-      selectionColumns.sort(null);
-    }
-
-    for (String selectionColumn : selectionColumns) {
-      if (canonicalColumnSet.add(selectionColumn)) {
-        canonicalColumns.add(selectionColumn);
-      }
-    }
-
-    int numCanonicalColumns = canonicalColumns.size();
-    String[] canonicalColumnNames = new String[numCanonicalColumns];
-    DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
-    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-    for (int i = 0; i < numCanonicalColumns; i++) {
-      String canonicalColumn = canonicalColumns.get(i);
-      canonicalColumnNames[i] = canonicalColumn;
-      canonicalColumnDataTypes[i] = columnDataTypes[columnToIndexMap.get(canonicalColumn)];
-    }
-    return new DataSchema(canonicalColumnNames, canonicalColumnDataTypes);
-  }
-
-  private static class CanonicalDataTable implements DataTable {
-    final DataTable _dataTable;
-    final int[] _indexMap;
-    final DataSchema _canonicalDataSchema;
-
-    CanonicalDataTable(DataTable dataTable, String[] canonicalColumns) {
-      _dataTable = dataTable;
-      int numCanonicalColumns = canonicalColumns.length;
-      _indexMap = new int[numCanonicalColumns];
-      DataSchema.ColumnDataType[] canonicalColumnDataTypes = new DataSchema.ColumnDataType[numCanonicalColumns];
-
-      DataSchema dataSchema = dataTable.getDataSchema();
-      Map<String, Integer> columnToIndexMap = SelectionOperatorUtils.getColumnToIndexMap(dataSchema.getColumnNames());
-      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-      for (int i = 0; i < numCanonicalColumns; i++) {
-        int columnIndex = columnToIndexMap.get(canonicalColumns[i]);
-        _indexMap[i] = columnIndex;
-        canonicalColumnDataTypes[i] = columnDataTypes[columnIndex];
-      }
-      _canonicalDataSchema = new DataSchema(canonicalColumns, canonicalColumnDataTypes);
-    }
-
-    @Override
-    public void addException(ProcessingException processingException) {
-      _dataTable.addException(processingException);
-    }
-
-    @Override
-    public byte[] toBytes()
-        throws IOException {
-      return _dataTable.toBytes();
-    }
-
-    @Override
-    public Map<String, String> getMetadata() {
-      return _dataTable.getMetadata();
-    }
-
-    @Override
-    public DataSchema getDataSchema() {
-      return _canonicalDataSchema;
-    }
-
-    @Override
-    public int getNumberOfRows() {
-      return _dataTable.getNumberOfRows();
-    }
-
-    @Override
-    public int getInt(int rowId, int colId) {
-      return _dataTable.getInt(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public long getLong(int rowId, int colId) {
-      return _dataTable.getLong(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public float getFloat(int rowId, int colId) {
-      return _dataTable.getFloat(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public double getDouble(int rowId, int colId) {
-      return _dataTable.getDouble(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public String getString(int rowId, int colId) {
-      return _dataTable.getString(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public <T> T getObject(int rowId, int colId) {
-      return _dataTable.getObject(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public int[] getIntArray(int rowId, int colId) {
-      return _dataTable.getIntArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public long[] getLongArray(int rowId, int colId) {
-      return _dataTable.getLongArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public float[] getFloatArray(int rowId, int colId) {
-      return _dataTable.getFloatArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public double[] getDoubleArray(int rowId, int colId) {
-      return _dataTable.getDoubleArray(rowId, _indexMap[colId]);
-    }
-
-    @Override
-    public String[] getStringArray(int rowId, int colId) {
-      return _dataTable.getStringArray(rowId, _indexMap[colId]);
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org