You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/11/09 18:27:35 UTC
(pinot) branch master updated: instrument building datatable (#11942)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bb52625f8a instrument building datatable (#11942)
bb52625f8a is described below
commit bb52625f8a1751b054c1344c5561bcba130c2daf
Author: Sabrina Zhao <yi...@linkedin.com>
AuthorDate: Thu Nov 9 10:27:30 2023 -0800
instrument building datatable (#11942)
---
.../pinot/common/datatable/BaseDataTable.java | 4 +
.../pinot/common/datatable/DataTableImplV4.java | 4 +
.../blocks/results/GroupByResultsBlock.java | 6 +
.../pinot/core/query/scheduler/QueryScheduler.java | 12 ++
.../query/selection/SelectionOperatorUtils.java | 3 +
.../accounting/ResourceManagerAccountingTest.java | 145 +++++++++++++++++++++
...flineClusterMemBasedServerQueryKillingTest.java | 14 ++
7 files changed, 188 insertions(+)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
index 5fce2ef759..ab0a227a11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
@@ -94,7 +95,9 @@ public abstract class BaseDataTable implements DataTable {
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
dataOutputStream.writeInt(_dictionaryMap.size());
+ int numEntriesAdded = 0;
for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : _dictionaryMap.entrySet()) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numEntriesAdded);
String columnName = dictionaryMapEntry.getKey();
Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
byte[] bytes = columnName.getBytes(UTF_8);
@@ -108,6 +111,7 @@ public abstract class BaseDataTable implements DataTable {
dataOutputStream.writeInt(valueBytes.length);
dataOutputStream.write(valueBytes);
}
+ numEntriesAdded++;
}
return byteArrayOutputStream.toByteArray();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index 963a2e96cc..b58669fd4b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -335,10 +336,13 @@ public class DataTableImplV4 implements DataTable {
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
dataOutputStream.writeInt(_stringDictionary.length);
+ int numEntriesAdded = 0;
for (String entry : _stringDictionary) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numEntriesAdded);
byte[] valueBytes = entry.getBytes(UTF_8);
dataOutputStream.writeInt(valueBytes.length);
dataOutputStream.write(valueBytes);
+ numEntriesAdded++;
}
return byteArrayOutputStream.toByteArray();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index c469363be9..81f46e6277 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -180,6 +181,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
int numColumns = _dataSchema.size();
Iterator<Record> iterator = _table.iterator();
+ int numRowsAdded = 0;
if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
Object[] nullPlaceholders = new Object[numColumns];
@@ -189,6 +191,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
}
int rowId = 0;
while (iterator.hasNext()) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
dataTableBuilder.startRow();
Object[] values = iterator.next().getValues();
for (int colId = 0; colId < numColumns; colId++) {
@@ -200,6 +203,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
setDataTableColumn(storedColumnDataTypes[colId], dataTableBuilder, colId, value);
}
dataTableBuilder.finishRow();
+ numRowsAdded++;
rowId++;
}
for (RoaringBitmap nullBitmap : nullBitmaps) {
@@ -207,12 +211,14 @@ public class GroupByResultsBlock extends BaseResultsBlock {
}
} else {
while (iterator.hasNext()) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
dataTableBuilder.startRow();
Object[] values = iterator.next().getValues();
for (int colId = 0; colId < numColumns; colId++) {
setDataTableColumn(storedColumnDataTypes[colId], dataTableBuilder, colId, values[colId]);
}
dataTableBuilder.finishRow();
+ numRowsAdded++;
}
}
return dataTableBuilder.build();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 8f5bcb887f..c4822e27e8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -34,12 +35,15 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,6 +219,14 @@ public abstract class QueryScheduler {
byte[] responseByte = null;
try {
responseByte = instanceResponse.toDataTable().toBytes();
+ } catch (EarlyTerminationException e) {
+ Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus();
+ String errMsg =
+ "Cancelled while building data table" + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg);
+ LOGGER.error(errMsg);
+ instanceResponse = new InstanceResponseBlock(new ExceptionResultsBlock(new QueryCancelledException(errMsg, e)));
+ instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId()));
+ return serializeResponse(queryRequest, instanceResponse);
} catch (Exception e) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
LOGGER.error("Caught exception while serializing response for requestId: {}, brokerId: {}",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 19b6c7f954..681fbe44b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -383,7 +383,9 @@ public class SelectionOperatorUtils {
}
}
+ int numRowsAdded = 0;
for (Object[] row : rows) {
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
dataTableBuilder.startRow();
for (int i = 0; i < numColumns; i++) {
Object columnValue = row[i];
@@ -438,6 +440,7 @@ public class SelectionOperatorUtils {
}
}
dataTableBuilder.finishRow();
+ numRowsAdded++;
}
if (nullHandlingEnabled) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 0a6199e71c..d3100ac8bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -18,22 +18,35 @@
*/
package org.apache.pinot.core.accounting;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant;
+import org.apache.pinot.core.common.datablock.DataBlockTestUtils;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -50,6 +63,7 @@ import org.testng.annotations.Test;
public class ResourceManagerAccountingTest {
public static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerAccountingTest.class);
+ private static final int NUM_ROWS = 1_000_000;
/**
* Test thread cpu usage tracking in multithread environment, add @Test to run.
@@ -223,6 +237,137 @@ public class ResourceManagerAccountingTest {
Assert.fail("Expected EarlyTerminationException to be thrown");
}
+ /**
+ * Test instrumentation during {@link DataTable} creation
+ */
+ @Test
+ public void testGetDataTableOOMSelect()
+ throws Exception {
+
+ // generate random rows
+ String[] columnNames = {
+ "int", "long", "float", "double", "big_decimal", "string", "bytes", "int_array", "long_array", "float_array",
+ "double_array", "string_array"
+ };
+ DataSchema.ColumnDataType[] columnDataTypes = {
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.FLOAT,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.BIG_DECIMAL, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.BYTES, DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+ DataSchema.ColumnDataType.FLOAT_ARRAY, DataSchema.ColumnDataType.DOUBLE_ARRAY,
+ DataSchema.ColumnDataType.STRING_ARRAY
+ };
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, NUM_ROWS, 0);
+
+ // set up logging and configs
+ LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+ LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+ HashMap<String, Object> configs = new HashMap<>();
+ ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+ "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+ PinotConfiguration config = getConfig(20, 2, configs);
+ ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
+ // init accountant and start watcher task
+ Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect");
+
+ CountDownLatch latch = new CountDownLatch(100);
+ AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
+
+ for (int i = 0; i < 100; i++) {
+ int finalI = i;
+ rm.getQueryRunners().submit(() -> {
+ Tracing.ThreadAccountantOps.setupRunner("testSelectQueryId" + finalI);
+ try {
+ SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false).toBytes();
+ } catch (EarlyTerminationException e) {
+ earlyTerminationOccurred.set(true);
+ Tracing.ThreadAccountantOps.clear();
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+ latch.await();
+ // assert that EarlyTerminationException was thrown in at least one runner thread
+ Assert.assertTrue(earlyTerminationOccurred.get());
+ }
+
+ /**
+ * Test instrumentation during {@link DataTable} creation
+ */
+ @Test
+ public void testGetDataTableOOMGroupBy()
+ throws Exception {
+
+ // generate random indexedTable
+ QueryContext queryContext =
+ QueryContextConverterUtils.getQueryContext("SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3, d4");
+ DataSchema dataSchema =
+ new DataSchema(new String[]{"d1", "d2", "d3", "d4", "sum(m1)", "max(m2)"}, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE
+ });
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, NUM_ROWS, 0);
+ IndexedTable indexedTable =
+ new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ for (Object[] row : rows) {
+ indexedTable.upsert(new Record(row));
+ }
+ indexedTable.finish(false);
+ // set up GroupByResultsBlock
+ GroupByResultsBlock groupByResultsBlock = new GroupByResultsBlock(indexedTable, queryContext);
+
+ // set up logging and configs
+ LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+ LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+ HashMap<String, Object> configs = new HashMap<>();
+ ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+ "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+ configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+ PinotConfiguration config = getConfig(20, 2, configs);
+ ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
+ // init accountant and start watcher task
+ Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy");
+
+ CountDownLatch latch = new CountDownLatch(100);
+ AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
+
+ for (int i = 0; i < 100; i++) {
+ int finalI = i;
+ rm.getQueryRunners().submit(() -> {
+ Tracing.ThreadAccountantOps.setupRunner("testGroupByQueryId" + finalI);
+ try {
+ groupByResultsBlock.getDataTable().toBytes();
+ } catch (EarlyTerminationException e) {
+ earlyTerminationOccurred.set(true);
+ Tracing.ThreadAccountantOps.clear();
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+ latch.await();
+ // assert that EarlyTerminationException was thrown in at least one runner thread
+ Assert.assertTrue(earlyTerminationOccurred.get());
+ }
+
/**
* Test thread memory usage tracking and query killing in multi-thread environment, add @Test to run.
*/
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 2e866805ea..c8a70ae031 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -80,6 +80,7 @@ public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterInt
"SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest FROM mytable";
private static final String COUNT_STAR_QUERY =
"SELECT * FROM mytable LIMIT 5";
+ private static final String OOM_QUERY_SELECTION_ONLY = "SELECT * FROM mytable LIMIT 9000000";
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(20, r -> {
Thread thread = new Thread(r);
@@ -223,6 +224,19 @@ public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterInt
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
}
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testSelectionOnlyOOM(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ notSupportedInV2();
+ JsonNode queryResponse = postQuery(OOM_QUERY_SELECTION_ONLY);
+
+ Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
+ + QueryException.QUERY_CANCELLATION_ERROR_CODE));
+ Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
+ Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testDigestOOM2(boolean useMultiStageQueryEngine)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org