You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/03 22:41:09 UTC

(pinot) branch master updated: Ask server to directly return final result for queries hitting single server (#11938)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ee8f6414 Ask server to directly return final result for queries hitting single server (#11938)
37ee8f6414 is described below

commit 37ee8f6414179727f7c57ccad30fb7fd616c5952
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Nov 3 15:41:02 2023 -0700

    Ask server to directly return final result for queries hitting single server (#11938)
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 113 +++++++++++----------
 .../blocks/results/AggregationResultsBlock.java    |   2 +-
 .../blocks/results/GroupByResultsBlock.java        |  29 +++++-
 .../function/AggregationFunctionUtils.java         |  26 +++++
 .../function/array/BaseArrayAggStringFunction.java |   6 +-
 .../tests/OfflineClusterIntegrationTest.java       |   3 -
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 -
 7 files changed, 118 insertions(+), 63 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 570748a74c..eed9ae56d4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -97,6 +97,7 @@ import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.FilterKind;
@@ -684,7 +685,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         return new BrokerResponseNative(exceptions);
       }
 
-      // Set the maximum serialized response size per server
+      // Set the maximum serialized response size per server, and ask server to directly return final response when only
+      // one server is queried
       int numServers = 0;
       if (offlineRoutingTable != null) {
         numServers += offlineRoutingTable.size();
@@ -692,14 +694,31 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       if (realtimeRoutingTable != null) {
         numServers += realtimeRoutingTable.size();
       }
-
       if (offlineBrokerRequest != null) {
-        setMaxServerResponseSizeBytes(numServers, offlineBrokerRequest.getPinotQuery().getQueryOptions(),
-            offlineTableConfig);
+        Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
+        setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
+        // Set the query option to directly return final result for single server query unless it is explicitly disabled
+        if (numServers == 1) {
+          // Set the same flag in the original server request to be used in the reduce phase for hybrid table
+          if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
+              && offlineBrokerRequest != serverBrokerRequest) {
+            serverBrokerRequest.getPinotQuery().getQueryOptions()
+                .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
+          }
+        }
       }
       if (realtimeBrokerRequest != null) {
-        setMaxServerResponseSizeBytes(numServers, realtimeBrokerRequest.getPinotQuery().getQueryOptions(),
-            realtimeTableConfig);
+        Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
+        setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
+        // Set the query option to directly return final result for single server query unless it is explicitly disabled
+        if (numServers == 1) {
+          // Set the same flag in the original server request to be used in the reduce phase for hybrid table
+          if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
+              && realtimeBrokerRequest != serverBrokerRequest) {
+            serverBrokerRequest.getPinotQuery().getQueryOptions()
+                .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
+          }
+        }
       }
 
       // Execute the query
@@ -1672,72 +1691,62 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
               timeSpentMs, queryTimeoutMs, tableNameWithType);
       throw new TimeoutException(errorMessage);
     }
-    queryOptions.put(Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
+    queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
     return remainingTimeMs;
   }
 
   /**
    * Sets a query option indicating the maximum response size that can be sent from a server to the broker. This size
    * is measured for the serialized response.
+   *
+   * The overriding order of priority is:
+   * 1. QueryOption  -> maxServerResponseSizeBytes
+   * 2. QueryOption  -> maxQueryResponseSizeBytes
+   * 3. TableConfig  -> maxServerResponseSizeBytes
+   * 4. TableConfig  -> maxQueryResponseSizeBytes
+   * 5. BrokerConfig -> maxServerResponseSizeBytes
+   * 6. BrokerConfig -> maxServerResponseSizeBytes
    */
   private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> queryOptions,
-      TableConfig tableConfig) {
-    if (numServers == 0) {
-      return;
-    }
-
-    // The overriding order of priority is:
-    // 1. QueryOption  -> maxServerResponseSizeBytes
-    // 2. QueryOption  -> maxQueryResponseSizeBytes
-    // 3. TableConfig  -> maxServerResponseSizeBytes
-    // 4. TableConfig  -> maxQueryResponseSizeBytes
-    // 5. BrokerConfig -> maxServerResponseSizeBytes
-    // 6. BrokerConfig -> maxServerResponseSizeBytes
-
+      @Nullable TableConfig tableConfig) {
     // QueryOption
     if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) {
       return;
     }
-    Long maxQueryResponseSizeQOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
-    if (maxQueryResponseSizeQOption != null) {
-      Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers;
-      queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
-          Long.toString(maxServerResponseSize));
+    Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
+    if (maxQueryResponseSizeQueryOption != null) {
+      queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxQueryResponseSizeQueryOption / numServers));
       return;
     }
 
     // TableConfig
-    Preconditions.checkState(tableConfig != null);
-    QueryConfig queryConfig = tableConfig.getQueryConfig();
-    if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() != null) {
-      Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes();
-      queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
-          Long.toString(maxServerResponseSize));
-      return;
-    }
-    if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() != null) {
-      Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes();
-      Long maxServerResponseSize = maxQueryResponseSize / numServers;
-      queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
-          Long.toString(maxServerResponseSize));
-      return;
+    if (tableConfig != null && tableConfig.getQueryConfig() != null) {
+      QueryConfig queryConfig = tableConfig.getQueryConfig();
+      if (queryConfig.getMaxServerResponseSizeBytes() != null) {
+        queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+            Long.toString(queryConfig.getMaxServerResponseSizeBytes()));
+        return;
+      }
+      if (queryConfig.getMaxQueryResponseSizeBytes() != null) {
+        queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+            Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers));
+        return;
+      }
     }
 
     // BrokerConfig
-    Long maxServerResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES,
-        Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES);
-    Long maxQueryResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
-        Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES);
-
-    if (maxServerResponseSizeCfg > 0) {
-      queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
-          Long.toString(maxServerResponseSizeCfg));
+    Long maxServerResponseSizeBrokerConfig =
+        _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES, Long.class);
+    if (maxServerResponseSizeBrokerConfig != null) {
+      queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, Long.toString(maxServerResponseSizeBrokerConfig));
       return;
     }
-    if (maxQueryResponseSizeCfg > 0) {
-      Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers;
-      queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
-          Long.toString(maxServerResponseSize));
+    Long maxQueryResponseSizeBrokerConfig =
+        _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES, Long.class);
+    if (maxQueryResponseSizeBrokerConfig != null) {
+      queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxQueryResponseSizeBrokerConfig / numServers));
     }
   }
 
@@ -1769,7 +1778,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
             numReplicaGroupsToQuery);
       }
     } catch (NumberFormatException e) {
-      String numReplicaGroupsToQuery = queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
+      String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
       throw new IllegalStateException(
           String.format("numReplicaGroups must be a positive number, got: %s", numReplicaGroupsToQuery));
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index cf13255f8b..8c3e025af3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -207,7 +207,7 @@ public class AggregationResultsBlock extends BaseResultsBlock {
         dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
         break;
       case STRING_ARRAY:
-        dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).toArray(new String[0]));
+        dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).elements());
         break;
       default:
         throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 8302a7db28..c469363be9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.core.operator.blocks.results;
 
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -240,13 +244,25 @@ public class GroupByResultsBlock extends BaseResultsBlock {
         dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
         break;
       case INT_ARRAY:
-        dataTableBuilder.setColumn(columnIndex, (int[]) value);
+        if (value instanceof IntArrayList) {
+          dataTableBuilder.setColumn(columnIndex, ((IntArrayList) value).elements());
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (int[]) value);
+        }
         break;
       case LONG_ARRAY:
-        dataTableBuilder.setColumn(columnIndex, (long[]) value);
+        if (value instanceof LongArrayList) {
+          dataTableBuilder.setColumn(columnIndex, ((LongArrayList) value).elements());
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (long[]) value);
+        }
         break;
       case FLOAT_ARRAY:
-        dataTableBuilder.setColumn(columnIndex, (float[]) value);
+        if (value instanceof FloatArrayList) {
+          dataTableBuilder.setColumn(columnIndex, ((FloatArrayList) value).elements());
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (float[]) value);
+        }
         break;
       case DOUBLE_ARRAY:
         if (value instanceof DoubleArrayList) {
@@ -256,7 +272,12 @@ public class GroupByResultsBlock extends BaseResultsBlock {
         }
         break;
       case STRING_ARRAY:
-        dataTableBuilder.setColumn(columnIndex, (String[]) value);
+        if (value instanceof ObjectArrayList) {
+          //noinspection unchecked
+          dataTableBuilder.setColumn(columnIndex, ((ObjectArrayList<String>) value).elements());
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (String[]) value);
+        }
         break;
       case OBJECT:
         dataTableBuilder.setColumn(columnIndex, value);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 7de88cb8c7..7a077ccbd5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -178,8 +178,34 @@ public class AggregationFunctionUtils {
         return dataTable.getString(rowId, colId);
       case BYTES:
         return dataTable.getBytes(rowId, colId).getBytes();
+      case INT_ARRAY:
+        return dataTable.getIntArray(rowId, colId);
+      case LONG_ARRAY:
+        return dataTable.getLongArray(rowId, colId);
+      case FLOAT_ARRAY:
+        return dataTable.getFloatArray(rowId, colId);
       case DOUBLE_ARRAY:
         return dataTable.getDoubleArray(rowId, colId);
+      case BOOLEAN_ARRAY: {
+        int[] intValues = dataTable.getIntArray(rowId, colId);
+        int numValues = intValues.length;
+        boolean[] booleanValues = new boolean[numValues];
+        for (int i = 0; i < numValues; i++) {
+          booleanValues[i] = intValues[i] == 1;
+        }
+        return booleanValues;
+      }
+      case TIMESTAMP_ARRAY: {
+        long[] longValues = dataTable.getLongArray(rowId, colId);
+        int numValues = longValues.length;
+        Timestamp[] timestampValues = new Timestamp[numValues];
+        for (int i = 0; i < numValues; i++) {
+          timestampValues[i] = new Timestamp(longValues[i]);
+        }
+        return timestampValues;
+      }
+      case STRING_ARRAY:
+        return dataTable.getStringArray(rowId, colId);
       default:
         throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
index 5f5b98f7c8..e906015a86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function.array;
 
 import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
 import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectIterators;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
@@ -93,6 +94,9 @@ public abstract class BaseArrayAggStringFunction<I extends AbstractObjectCollect
 
   @Override
   public ObjectArrayList<String> extractFinalResult(I stringArrayList) {
-    return new ObjectArrayList<>(stringArrayList);
+    // NOTE: Wrap a String[] to work around the bug of ObjectArrayList constructor creating Object[] internally.
+    String[] stringArray = new String[stringArrayList.size()];
+    ObjectIterators.unwrap(stringArrayList.iterator(), stringArray);
+    return ObjectArrayList.wrap(stringArray);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index e20ad0c9fd..09b7411d1c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -314,9 +314,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Override
   protected void testQuery(String pinotQuery, String h2Query)
       throws Exception {
-    if (getNumServers() == 1) {
-      pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
-    }
     super.testQuery(pinotQuery, h2Query);
   }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fffa3ba17b..568b1e1505 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -324,11 +324,9 @@ public class CommonConstants {
     // Broker config indicating the maximum serialized response size across all servers for a query. This value is
     // equally divided across all servers processing the query.
     public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = "pinot.broker.max.query.response.size.bytes";
-    public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;
 
     // Broker config indicating the maximum length of the serialized response per server for a query.
     public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes";
-    public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;
 
 
     public static class Request {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org