You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/11/09 18:27:35 UTC

(pinot) branch master updated: instrument building datatable (#11942)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bb52625f8a instrument building datatable (#11942)
bb52625f8a is described below

commit bb52625f8a1751b054c1344c5561bcba130c2daf
Author: Sabrina Zhao <yi...@linkedin.com>
AuthorDate: Thu Nov 9 10:27:30 2023 -0800

    instrument building datatable (#11942)
---
 .../pinot/common/datatable/BaseDataTable.java      |   4 +
 .../pinot/common/datatable/DataTableImplV4.java    |   4 +
 .../blocks/results/GroupByResultsBlock.java        |   6 +
 .../pinot/core/query/scheduler/QueryScheduler.java |  12 ++
 .../query/selection/SelectionOperatorUtils.java    |   3 +
 .../accounting/ResourceManagerAccountingTest.java  | 145 +++++++++++++++++++++
 ...flineClusterMemBasedServerQueryKillingTest.java |  14 ++
 7 files changed, 188 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
index 5fce2ef759..ab0a227a11 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -94,7 +95,9 @@ public abstract class BaseDataTable implements DataTable {
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
 
     dataOutputStream.writeInt(_dictionaryMap.size());
+    int numEntriesAdded = 0;
     for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : _dictionaryMap.entrySet()) {
+      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numEntriesAdded);
       String columnName = dictionaryMapEntry.getKey();
       Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
       byte[] bytes = columnName.getBytes(UTF_8);
@@ -108,6 +111,7 @@ public abstract class BaseDataTable implements DataTable {
         dataOutputStream.writeInt(valueBytes.length);
         dataOutputStream.write(valueBytes);
       }
+      numEntriesAdded++;
     }
 
     return byteArrayOutputStream.toByteArray();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index 963a2e96cc..b58669fd4b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -335,10 +336,13 @@ public class DataTableImplV4 implements DataTable {
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
 
     dataOutputStream.writeInt(_stringDictionary.length);
+    int numEntriesAdded = 0;
     for (String entry : _stringDictionary) {
+      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numEntriesAdded);
       byte[] valueBytes = entry.getBytes(UTF_8);
       dataOutputStream.writeInt(valueBytes.length);
       dataOutputStream.write(valueBytes);
+      numEntriesAdded++;
     }
 
     return byteArrayOutputStream.toByteArray();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index c469363be9..81f46e6277 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -180,6 +181,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
     ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
     int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
+    int numRowsAdded = 0;
     if (_queryContext.isNullHandlingEnabled()) {
       RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
       Object[] nullPlaceholders = new Object[numColumns];
@@ -189,6 +191,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
       }
       int rowId = 0;
       while (iterator.hasNext()) {
+        Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
         dataTableBuilder.startRow();
         Object[] values = iterator.next().getValues();
         for (int colId = 0; colId < numColumns; colId++) {
@@ -200,6 +203,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
           setDataTableColumn(storedColumnDataTypes[colId], dataTableBuilder, colId, value);
         }
         dataTableBuilder.finishRow();
+        numRowsAdded++;
         rowId++;
       }
       for (RoaringBitmap nullBitmap : nullBitmaps) {
@@ -207,12 +211,14 @@ public class GroupByResultsBlock extends BaseResultsBlock {
       }
     } else {
       while (iterator.hasNext()) {
+        Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
         dataTableBuilder.startRow();
         Object[] values = iterator.next().getValues();
         for (int colId = 0; colId < numColumns; colId++) {
           setDataTableColumn(storedColumnDataTypes[colId], dataTableBuilder, colId, values[colId]);
         }
         dataTableBuilder.finishRow();
+        numRowsAdded++;
       }
     }
     return dataTableBuilder.build();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 8f5bcb887f..c4822e27e8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -34,12 +35,15 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -215,6 +219,14 @@ public abstract class QueryScheduler {
     byte[] responseByte = null;
     try {
       responseByte = instanceResponse.toDataTable().toBytes();
+    } catch (EarlyTerminationException e) {
+      Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus();
+      String errMsg =
+          "Cancelled while building data table" + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg);
+      LOGGER.error(errMsg);
+      instanceResponse = new InstanceResponseBlock(new ExceptionResultsBlock(new QueryCancelledException(errMsg, e)));
+      instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId()));
+      return serializeResponse(queryRequest, instanceResponse);
     } catch (Exception e) {
       _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
       LOGGER.error("Caught exception while serializing response for requestId: {}, brokerId: {}",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 19b6c7f954..681fbe44b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -383,7 +383,9 @@ public class SelectionOperatorUtils {
       }
     }
 
+    int numRowsAdded = 0;
     for (Object[] row : rows) {
+      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numRowsAdded);
       dataTableBuilder.startRow();
       for (int i = 0; i < numColumns; i++) {
         Object columnValue = row[i];
@@ -438,6 +440,7 @@ public class SelectionOperatorUtils {
         }
       }
       dataTableBuilder.finishRow();
+      numRowsAdded++;
     }
 
     if (nullHandlingEnabled) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 0a6199e71c..d3100ac8bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -18,22 +18,35 @@
  */
 package org.apache.pinot.core.accounting;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant;
+import org.apache.pinot.core.common.datablock.DataBlockTestUtils;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -50,6 +63,7 @@ import org.testng.annotations.Test;
 public class ResourceManagerAccountingTest {
 
   public static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerAccountingTest.class);
+  private static final int NUM_ROWS = 1_000_000;
 
   /**
    * Test thread cpu usage tracking in multithread environment, add @Test to run.
@@ -223,6 +237,137 @@ public class ResourceManagerAccountingTest {
     Assert.fail("Expected EarlyTerminationException to be thrown");
   }
 
+  /**
+   * Test instrumentation during {@link DataTable} creation
+   */
+  @Test
+  public void testGetDataTableOOMSelect()
+      throws Exception {
+
+    // generate random rows
+    String[] columnNames = {
+        "int", "long", "float", "double", "big_decimal", "string", "bytes", "int_array", "long_array", "float_array",
+        "double_array", "string_array"
+    };
+    DataSchema.ColumnDataType[] columnDataTypes = {
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.FLOAT,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.BIG_DECIMAL, DataSchema.ColumnDataType.STRING,
+        DataSchema.ColumnDataType.BYTES, DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+        DataSchema.ColumnDataType.FLOAT_ARRAY, DataSchema.ColumnDataType.DOUBLE_ARRAY,
+        DataSchema.ColumnDataType.STRING_ARRAY
+    };
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, NUM_ROWS, 0);
+
+    // set up logging and configs
+    LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+    PinotConfiguration config = getConfig(20, 2, configs);
+    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
+    // init accountant and start watcher task
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect");
+
+    CountDownLatch latch = new CountDownLatch(100);
+    AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
+
+    for (int i = 0; i < 100; i++) {
+      int finalI = i;
+      rm.getQueryRunners().submit(() -> {
+        Tracing.ThreadAccountantOps.setupRunner("testSelectQueryId" + finalI);
+        try {
+          SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, false).toBytes();
+        } catch (EarlyTerminationException e) {
+          earlyTerminationOccurred.set(true);
+          Tracing.ThreadAccountantOps.clear();
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        } finally {
+          latch.countDown();
+        }
+      });
+    }
+    latch.await();
+    // assert that EarlyTerminationException was thrown in at least one runner thread
+    Assert.assertTrue(earlyTerminationOccurred.get());
+  }
+
+  /**
+   * Test instrumentation during {@link DataTable} creation
+   */
+  @Test
+  public void testGetDataTableOOMGroupBy()
+      throws Exception {
+
+    // generate random indexedTable
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT SUM(m1), MAX(m2) FROM testTable GROUP BY d1, d2, d3, d4");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"d1", "d2", "d3", "d4", "sum(m1)", "max(m2)"}, new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE
+        });
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, NUM_ROWS, 0);
+    IndexedTable indexedTable =
+        new SimpleIndexedTable(dataSchema, queryContext, NUM_ROWS, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    for (Object[] row : rows) {
+      indexedTable.upsert(new Record(row));
+    }
+    indexedTable.finish(false);
+    // set up GroupByResultsBlock
+    GroupByResultsBlock groupByResultsBlock = new GroupByResultsBlock(indexedTable, queryContext);
+
+    // set up logging and configs
+    LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.OFF);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.OFF);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+    PinotConfiguration config = getConfig(20, 2, configs);
+    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
+    // init accountant and start watcher task
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy");
+
+    CountDownLatch latch = new CountDownLatch(100);
+    AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
+
+    for (int i = 0; i < 100; i++) {
+      int finalI = i;
+      rm.getQueryRunners().submit(() -> {
+        Tracing.ThreadAccountantOps.setupRunner("testGroupByQueryId" + finalI);
+        try {
+          groupByResultsBlock.getDataTable().toBytes();
+        } catch (EarlyTerminationException e) {
+          earlyTerminationOccurred.set(true);
+          Tracing.ThreadAccountantOps.clear();
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        } finally {
+          latch.countDown();
+        }
+      });
+    }
+    latch.await();
+    // assert that EarlyTerminationException was thrown in at least one runner thread
+    Assert.assertTrue(earlyTerminationOccurred.get());
+  }
+
   /**
    * Test thread memory usage tracking and query killing in multi-thread environment, add @Test to run.
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 2e866805ea..c8a70ae031 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -80,6 +80,7 @@ public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterInt
       "SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest FROM mytable";
   private static final String COUNT_STAR_QUERY =
       "SELECT * FROM mytable LIMIT 5";
+  private static final String OOM_QUERY_SELECTION_ONLY = "SELECT * FROM mytable LIMIT 9000000";
 
   private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(20, r -> {
     Thread thread = new Thread(r);
@@ -223,6 +224,19 @@ public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterInt
     Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testSelectionOnlyOOM(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    notSupportedInV2();
+    JsonNode queryResponse = postQuery(OOM_QUERY_SELECTION_ONLY);
+
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
+        + QueryException.QUERY_CANCELLATION_ERROR_CODE));
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testDigestOOM2(boolean useMultiStageQueryEngine)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org