You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/31 22:52:52 UTC
[pinot] branch master updated: Fix the group-by reduce handling when query times out (#8450)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8dcb255 Fix the group-by reduce handling when query times out (#8450)
8dcb255 is described below
commit 8dcb25580e21b001633b9931107ba0b5baa037d8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Mar 31 15:52:27 2022 -0700
Fix the group-by reduce handling when query times out (#8450)
When broker times out, there are multiple places not handled properly:
- Should check the return value of `CountDownLatch.await()` to identify if the query times out
- `Future[]` length should be the reduce group size
- When the query times out, the result table won't be set, so reading rows from the result table can cause NPE
---
.../core/query/reduce/GroupByDataTableReducer.java | 52 ++++++++------
.../core/query/reduce/BrokerReduceServiceTest.java | 79 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 20 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index fd668c7..db01acd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.reduce;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -127,11 +128,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
try {
setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName,
brokerMetrics);
+ resultSize = brokerResponseNative.getResultTable().getRows().size();
} catch (TimeoutException e) {
brokerResponseNative.getProcessingExceptions()
.add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
}
- resultSize = brokerResponseNative.getResultTable().getRows().size();
} else {
// 2. groupByMode = sql, responseFormat = pql
// This mode will invoke SQL style group by execution, but present results in PQL way
@@ -192,12 +193,18 @@ public class GroupByDataTableReducer implements DataTableReducer {
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
throws TimeoutException {
- IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
- brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+ Iterator<Record> sortedIterator;
+ if (!dataTables.isEmpty()) {
+ IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+ brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+ }
+ sortedIterator = indexedTable.iterator();
+ } else {
+ sortedIterator = Collections.emptyIterator();
}
- Iterator<Record> sortedIterator = indexedTable.iterator();
+
DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
@@ -302,7 +309,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
int trimThreshold = reducerContext.getGroupByTrimThreshold();
IndexedTable indexedTable;
- if (numReduceThreadsToUse <= 1) {
+ if (numReduceThreadsToUse == 1) {
indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
} else {
if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
@@ -316,9 +323,6 @@ public class GroupByDataTableReducer implements DataTableReducer {
}
}
- Future[] futures = new Future[numDataTables];
- CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
-
// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
@@ -331,16 +335,21 @@ public class GroupByDataTableReducer implements DataTableReducer {
reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
}
- int cnt = 0;
+ Future[] futures = new Future[numReduceThreadsToUse];
+ CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
- for (List<DataTable> reduceGroup : reduceGroups) {
- futures[cnt++] = reducerContext.getExecutorService().submit(new TraceRunnable() {
+ for (int i = 0; i < numReduceThreadsToUse; i++) {
+ List<DataTable> reduceGroup = reduceGroups.get(i);
+ futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() {
@Override
public void runJob() {
for (DataTable dataTable : reduceGroup) {
- int numRows = dataTable.getNumberOfRows();
-
+ // Terminate when thread is interrupted. This is expected when the query already fails in the main thread.
+ if (Thread.interrupted()) {
+ return;
+ }
try {
+ int numRows = dataTable.getNumberOfRows();
for (int rowId = 0; rowId < numRows; rowId++) {
Object[] values = new Object[_numColumns];
for (int colId = 0; colId < _numColumns; colId++) {
@@ -383,14 +392,17 @@ public class GroupByDataTableReducer implements DataTableReducer {
try {
long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
- countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+ if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
+ throw new TimeoutException("Timed out in broker reduce phase");
+ }
} catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted in broker reduce phase", e);
+ } finally {
for (Future future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
- throw new TimeoutException("Timed out in broker reduce phase.");
}
indexedTable.finish(true);
@@ -412,10 +424,10 @@ public class GroupByDataTableReducer implements DataTableReducer {
private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
// Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
- return Math.min(1, numDataTables); // Number of data tables can be zero.
+ return 1;
+ } else {
+ return Math.min(numDataTables, maxReduceThreadsPerQuery);
}
-
- return Math.min(maxReduceThreadsPerQuery, numDataTables);
}
/**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
new file mode 100644
index 0000000..884f933
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class BrokerReduceServiceTest {
+ private static final CalciteSqlCompiler COMPILER = new CalciteSqlCompiler();
+
+ @Test
+ public void testReduceTimeout()
+ throws IOException {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2);
+ BrokerReduceService brokerReduceService = new BrokerReduceService(new PinotConfiguration(properties));
+
+ BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(
+ "SELECT COUNT(*) FROM testTable GROUP BY col1 OPTION(groupByMode=sql,responseFormat=sql)");
+ DataSchema dataSchema =
+ new DataSchema(new String[]{"col1", "count(*)"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+ DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ int numGroups = 5000;
+ for (int i = 0; i < numGroups; i++) {
+ dataTableBuilder.startRow();
+ dataTableBuilder.setColumn(0, i);
+ dataTableBuilder.setColumn(1, 1L);
+ dataTableBuilder.finishRow();
+ }
+ DataTable dataTable = dataTableBuilder.build();
+ Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+ int numInstances = 1000;
+ for (int i = 0; i < numInstances; i++) {
+ ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE);
+ dataTableMap.put(instance, dataTable);
+ }
+ long reduceTimeoutMs = 1;
+ BrokerResponseNative brokerResponse =
+ brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, null);
+ List<QueryProcessingException> processingExceptions = brokerResponse.getProcessingExceptions();
+ assertEquals(processingExceptions.size(), 1);
+ assertEquals(processingExceptions.get(0).getErrorCode(), QueryException.BROKER_TIMEOUT_ERROR_CODE);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org