You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/03 22:41:09 UTC
(pinot) branch master updated: Ask server to directly return final result for queries hitting single server (#11938)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37ee8f6414 Ask server to directly return final result for queries hitting single server (#11938)
37ee8f6414 is described below
commit 37ee8f6414179727f7c57ccad30fb7fd616c5952
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Nov 3 15:41:02 2023 -0700
Ask server to directly return final result for queries hitting single server (#11938)
---
.../requesthandler/BaseBrokerRequestHandler.java | 113 +++++++++++----------
.../blocks/results/AggregationResultsBlock.java | 2 +-
.../blocks/results/GroupByResultsBlock.java | 29 +++++-
.../function/AggregationFunctionUtils.java | 26 +++++
.../function/array/BaseArrayAggStringFunction.java | 6 +-
.../tests/OfflineClusterIntegrationTest.java | 3 -
.../apache/pinot/spi/utils/CommonConstants.java | 2 -
7 files changed, 118 insertions(+), 63 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 570748a74c..eed9ae56d4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -97,6 +97,7 @@ import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
@@ -684,7 +685,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
return new BrokerResponseNative(exceptions);
}
- // Set the maximum serialized response size per server
+ // Set the maximum serialized response size per server, and ask server to directly return final response when only
+ // one server is queried
int numServers = 0;
if (offlineRoutingTable != null) {
numServers += offlineRoutingTable.size();
@@ -692,14 +694,31 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (realtimeRoutingTable != null) {
numServers += realtimeRoutingTable.size();
}
-
if (offlineBrokerRequest != null) {
- setMaxServerResponseSizeBytes(numServers, offlineBrokerRequest.getPinotQuery().getQueryOptions(),
- offlineTableConfig);
+ Map<String, String> queryOptions = offlineBrokerRequest.getPinotQuery().getQueryOptions();
+ setMaxServerResponseSizeBytes(numServers, queryOptions, offlineTableConfig);
+ // Set the query option to directly return final result for single server query unless it is explicitly disabled
+ if (numServers == 1) {
+ // Set the same flag in the original server request to be used in the reduce phase for hybrid table
+ if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
+ && offlineBrokerRequest != serverBrokerRequest) {
+ serverBrokerRequest.getPinotQuery().getQueryOptions()
+ .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
+ }
+ }
}
if (realtimeBrokerRequest != null) {
- setMaxServerResponseSizeBytes(numServers, realtimeBrokerRequest.getPinotQuery().getQueryOptions(),
- realtimeTableConfig);
+ Map<String, String> queryOptions = realtimeBrokerRequest.getPinotQuery().getQueryOptions();
+ setMaxServerResponseSizeBytes(numServers, queryOptions, realtimeTableConfig);
+ // Set the query option to directly return final result for single server query unless it is explicitly disabled
+ if (numServers == 1) {
+ // Set the same flag in the original server request to be used in the reduce phase for hybrid table
+ if (queryOptions.putIfAbsent(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true") == null
+ && realtimeBrokerRequest != serverBrokerRequest) {
+ serverBrokerRequest.getPinotQuery().getQueryOptions()
+ .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
+ }
+ }
}
// Execute the query
@@ -1672,72 +1691,62 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
timeSpentMs, queryTimeoutMs, tableNameWithType);
throw new TimeoutException(errorMessage);
}
- queryOptions.put(Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
+ queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(remainingTimeMs));
return remainingTimeMs;
}
/**
* Sets a query option indicating the maximum response size that can be sent from a server to the broker. This size
* is measured for the serialized response.
+ *
+ * The overriding order of priority is:
+ * 1. QueryOption -> maxServerResponseSizeBytes
+ * 2. QueryOption -> maxQueryResponseSizeBytes
+ * 3. TableConfig -> maxServerResponseSizeBytes
+ * 4. TableConfig -> maxQueryResponseSizeBytes
+ * 5. BrokerConfig -> maxServerResponseSizeBytes
+ * 6. BrokerConfig -> maxServerResponseSizeBytes
*/
private void setMaxServerResponseSizeBytes(int numServers, Map<String, String> queryOptions,
- TableConfig tableConfig) {
- if (numServers == 0) {
- return;
- }
-
- // The overriding order of priority is:
- // 1. QueryOption -> maxServerResponseSizeBytes
- // 2. QueryOption -> maxQueryResponseSizeBytes
- // 3. TableConfig -> maxServerResponseSizeBytes
- // 4. TableConfig -> maxQueryResponseSizeBytes
- // 5. BrokerConfig -> maxServerResponseSizeBytes
- // 6. BrokerConfig -> maxServerResponseSizeBytes
-
+ @Nullable TableConfig tableConfig) {
// QueryOption
if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) {
return;
}
- Long maxQueryResponseSizeQOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
- if (maxQueryResponseSizeQOption != null) {
- Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers;
- queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
- Long.toString(maxServerResponseSize));
+ Long maxQueryResponseSizeQueryOption = QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
+ if (maxQueryResponseSizeQueryOption != null) {
+ queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxQueryResponseSizeQueryOption / numServers));
return;
}
// TableConfig
- Preconditions.checkState(tableConfig != null);
- QueryConfig queryConfig = tableConfig.getQueryConfig();
- if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() != null) {
- Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes();
- queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
- Long.toString(maxServerResponseSize));
- return;
- }
- if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() != null) {
- Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes();
- Long maxServerResponseSize = maxQueryResponseSize / numServers;
- queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
- Long.toString(maxServerResponseSize));
- return;
+ if (tableConfig != null && tableConfig.getQueryConfig() != null) {
+ QueryConfig queryConfig = tableConfig.getQueryConfig();
+ if (queryConfig.getMaxServerResponseSizeBytes() != null) {
+ queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(queryConfig.getMaxServerResponseSizeBytes()));
+ return;
+ }
+ if (queryConfig.getMaxQueryResponseSizeBytes() != null) {
+ queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(queryConfig.getMaxQueryResponseSizeBytes() / numServers));
+ return;
+ }
}
// BrokerConfig
- Long maxServerResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES,
- Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES);
- Long maxQueryResponseSizeCfg = _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
- Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES);
-
- if (maxServerResponseSizeCfg > 0) {
- queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
- Long.toString(maxServerResponseSizeCfg));
+ Long maxServerResponseSizeBrokerConfig =
+ _config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES, Long.class);
+ if (maxServerResponseSizeBrokerConfig != null) {
+ queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, Long.toString(maxServerResponseSizeBrokerConfig));
return;
}
- if (maxQueryResponseSizeCfg > 0) {
- Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers;
- queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
- Long.toString(maxServerResponseSize));
+ Long maxQueryResponseSizeBrokerConfig =
+ _config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES, Long.class);
+ if (maxQueryResponseSizeBrokerConfig != null) {
+ queryOptions.put(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxQueryResponseSizeBrokerConfig / numServers));
}
}
@@ -1769,7 +1778,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
numReplicaGroupsToQuery);
}
} catch (NumberFormatException e) {
- String numReplicaGroupsToQuery = queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
+ String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
throw new IllegalStateException(
String.format("numReplicaGroups must be a positive number, got: %s", numReplicaGroupsToQuery));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index cf13255f8b..8c3e025af3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -207,7 +207,7 @@ public class AggregationResultsBlock extends BaseResultsBlock {
dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
break;
case STRING_ARRAY:
- dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).toArray(new String[0]));
+ dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).elements());
break;
default:
throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 8302a7db28..c469363be9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -19,6 +19,10 @@
package org.apache.pinot.core.operator.blocks.results;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -240,13 +244,25 @@ public class GroupByResultsBlock extends BaseResultsBlock {
dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
break;
case INT_ARRAY:
- dataTableBuilder.setColumn(columnIndex, (int[]) value);
+ if (value instanceof IntArrayList) {
+ dataTableBuilder.setColumn(columnIndex, ((IntArrayList) value).elements());
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (int[]) value);
+ }
break;
case LONG_ARRAY:
- dataTableBuilder.setColumn(columnIndex, (long[]) value);
+ if (value instanceof LongArrayList) {
+ dataTableBuilder.setColumn(columnIndex, ((LongArrayList) value).elements());
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (long[]) value);
+ }
break;
case FLOAT_ARRAY:
- dataTableBuilder.setColumn(columnIndex, (float[]) value);
+ if (value instanceof FloatArrayList) {
+ dataTableBuilder.setColumn(columnIndex, ((FloatArrayList) value).elements());
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (float[]) value);
+ }
break;
case DOUBLE_ARRAY:
if (value instanceof DoubleArrayList) {
@@ -256,7 +272,12 @@ public class GroupByResultsBlock extends BaseResultsBlock {
}
break;
case STRING_ARRAY:
- dataTableBuilder.setColumn(columnIndex, (String[]) value);
+ if (value instanceof ObjectArrayList) {
+ //noinspection unchecked
+ dataTableBuilder.setColumn(columnIndex, ((ObjectArrayList<String>) value).elements());
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (String[]) value);
+ }
break;
case OBJECT:
dataTableBuilder.setColumn(columnIndex, value);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 7de88cb8c7..7a077ccbd5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -178,8 +178,34 @@ public class AggregationFunctionUtils {
return dataTable.getString(rowId, colId);
case BYTES:
return dataTable.getBytes(rowId, colId).getBytes();
+ case INT_ARRAY:
+ return dataTable.getIntArray(rowId, colId);
+ case LONG_ARRAY:
+ return dataTable.getLongArray(rowId, colId);
+ case FLOAT_ARRAY:
+ return dataTable.getFloatArray(rowId, colId);
case DOUBLE_ARRAY:
return dataTable.getDoubleArray(rowId, colId);
+ case BOOLEAN_ARRAY: {
+ int[] intValues = dataTable.getIntArray(rowId, colId);
+ int numValues = intValues.length;
+ boolean[] booleanValues = new boolean[numValues];
+ for (int i = 0; i < numValues; i++) {
+ booleanValues[i] = intValues[i] == 1;
+ }
+ return booleanValues;
+ }
+ case TIMESTAMP_ARRAY: {
+ long[] longValues = dataTable.getLongArray(rowId, colId);
+ int numValues = longValues.length;
+ Timestamp[] timestampValues = new Timestamp[numValues];
+ for (int i = 0; i < numValues; i++) {
+ timestampValues[i] = new Timestamp(longValues[i]);
+ }
+ return timestampValues;
+ }
+ case STRING_ARRAY:
+ return dataTable.getStringArray(rowId, colId);
default:
throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
index 5f5b98f7c8..e906015a86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/array/BaseArrayAggStringFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function.array;
import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectIterators;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
@@ -93,6 +94,9 @@ public abstract class BaseArrayAggStringFunction<I extends AbstractObjectCollect
@Override
public ObjectArrayList<String> extractFinalResult(I stringArrayList) {
- return new ObjectArrayList<>(stringArrayList);
+ // NOTE: Wrap a String[] to work around the bug of ObjectArrayList constructor creating Object[] internally.
+ String[] stringArray = new String[stringArrayList.size()];
+ ObjectIterators.unwrap(stringArrayList.iterator(), stringArray);
+ return ObjectArrayList.wrap(stringArray);
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index e20ad0c9fd..09b7411d1c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -314,9 +314,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
@Override
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
- if (getNumServers() == 1) {
- pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
- }
super.testQuery(pinotQuery, h2Query);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fffa3ba17b..568b1e1505 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -324,11 +324,9 @@ public class CommonConstants {
// Broker config indicating the maximum serialized response size across all servers for a query. This value is
// equally divided across all servers processing the query.
public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = "pinot.broker.max.query.response.size.bytes";
- public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;
// Broker config indicating the maximum length of the serialized response per server for a query.
public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes";
- public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES = Long.MAX_VALUE;
public static class Request {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org