You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/31 22:52:52 UTC

[pinot] branch master updated: Fix the group-by reduce handling when query times out (#8450)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dcb255  Fix the group-by reduce handling when query times out (#8450)
8dcb255 is described below

commit 8dcb25580e21b001633b9931107ba0b5baa037d8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Mar 31 15:52:27 2022 -0700

    Fix the group-by reduce handling when query times out (#8450)
    
    When broker times out, there are multiple places not handled properly:
    - Should check the return value of `CountDownLatch.await()` to identify if the query times out
    - `Future[]` length should be the reduce group size
    - When the query times out, the result table won't be set, so reading rows from the result table can cause NPE
---
 .../core/query/reduce/GroupByDataTableReducer.java | 52 ++++++++------
 .../core/query/reduce/BrokerReduceServiceTest.java | 79 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 20 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index fd668c7..db01acd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.reduce;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -127,11 +128,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
         try {
           setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName,
               brokerMetrics);
+          resultSize = brokerResponseNative.getResultTable().getRows().size();
         } catch (TimeoutException e) {
           brokerResponseNative.getProcessingExceptions()
               .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
         }
-        resultSize = brokerResponseNative.getResultTable().getRows().size();
       } else {
         // 2. groupByMode = sql, responseFormat = pql
         // This mode will invoke SQL style group by execution, but present results in PQL way
@@ -192,12 +193,18 @@ public class GroupByDataTableReducer implements DataTableReducer {
       Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
       BrokerMetrics brokerMetrics)
       throws TimeoutException {
-    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
-    if (brokerMetrics != null) {
-      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
-      brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    Iterator<Record> sortedIterator;
+    if (!dataTables.isEmpty()) {
+      IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+      if (brokerMetrics != null) {
+        brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+        brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+      }
+      sortedIterator = indexedTable.iterator();
+    } else {
+      sortedIterator = Collections.emptyIterator();
     }
-    Iterator<Record> sortedIterator = indexedTable.iterator();
+
     DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
     ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
@@ -302,7 +309,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
     int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
     int trimThreshold = reducerContext.getGroupByTrimThreshold();
     IndexedTable indexedTable;
-    if (numReduceThreadsToUse <= 1) {
+    if (numReduceThreadsToUse == 1) {
       indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
     } else {
       if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
@@ -316,9 +323,6 @@ public class GroupByDataTableReducer implements DataTableReducer {
       }
     }
 
-    Future[] futures = new Future[numDataTables];
-    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
-
     // Create groups of data tables that each thread can process concurrently.
     // Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
     ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
@@ -331,16 +335,21 @@ public class GroupByDataTableReducer implements DataTableReducer {
       reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
     }
 
-    int cnt = 0;
+    Future[] futures = new Future[numReduceThreadsToUse];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
     ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
-    for (List<DataTable> reduceGroup : reduceGroups) {
-      futures[cnt++] = reducerContext.getExecutorService().submit(new TraceRunnable() {
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      List<DataTable> reduceGroup = reduceGroups.get(i);
+      futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() {
         @Override
         public void runJob() {
           for (DataTable dataTable : reduceGroup) {
-            int numRows = dataTable.getNumberOfRows();
-
+            // Terminate when thread is interrupted. This is expected when the query already fails in the main thread.
+            if (Thread.interrupted()) {
+              return;
+            }
             try {
+              int numRows = dataTable.getNumberOfRows();
               for (int rowId = 0; rowId < numRows; rowId++) {
                 Object[] values = new Object[_numColumns];
                 for (int colId = 0; colId < _numColumns; colId++) {
@@ -383,14 +392,17 @@ public class GroupByDataTableReducer implements DataTableReducer {
 
     try {
       long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
-      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+      if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
+        throw new TimeoutException("Timed out in broker reduce phase");
+      }
     } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted in broker reduce phase", e);
+    } finally {
       for (Future future : futures) {
         if (!future.isDone()) {
           future.cancel(true);
         }
       }
-      throw new TimeoutException("Timed out in broker reduce phase.");
     }
 
     indexedTable.finish(true);
@@ -412,10 +424,10 @@ public class GroupByDataTableReducer implements DataTableReducer {
   private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
     // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
     if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
-      return Math.min(1, numDataTables); // Number of data tables can be zero.
+      return 1;
+    } else {
+      return Math.min(numDataTables, maxReduceThreadsPerQuery);
     }
-
-    return Math.min(maxReduceThreadsPerQuery, numDataTables);
   }
 
   /**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
new file mode 100644
index 0000000..884f933
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class BrokerReduceServiceTest {
+  private static final CalciteSqlCompiler COMPILER = new CalciteSqlCompiler();
+
+  @Test
+  public void testReduceTimeout()
+      throws IOException {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2);
+    BrokerReduceService brokerReduceService = new BrokerReduceService(new PinotConfiguration(properties));
+
+    BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(
+        "SELECT COUNT(*) FROM testTable GROUP BY col1 OPTION(groupByMode=sql,responseFormat=sql)");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1", "count(*)"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    int numGroups = 5000;
+    for (int i = 0; i < numGroups; i++) {
+      dataTableBuilder.startRow();
+      dataTableBuilder.setColumn(0, i);
+      dataTableBuilder.setColumn(1, 1L);
+      dataTableBuilder.finishRow();
+    }
+    DataTable dataTable = dataTableBuilder.build();
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numInstances = 1000;
+    for (int i = 0; i < numInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 1;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, null);
+    List<QueryProcessingException> processingExceptions = brokerResponse.getProcessingExceptions();
+    assertEquals(processingExceptions.size(), 1);
+    assertEquals(processingExceptions.get(0).getErrorCode(), QueryException.BROKER_TIMEOUT_ERROR_CODE);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org