You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/06 22:38:38 UTC

(pinot) branch master updated: [bugfix][multistage] explicit warning flags set on each stage stats (#11936)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d177866527 [bugfix][multistage] explicit warning flags set on each stage stats (#11936)
d177866527 is described below

commit d177866527e3af3b8cdd350ac6b0762edc99b43a
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Nov 6 14:38:32 2023 -0800

    [bugfix][multistage] explicit warning flags set on each stage stats (#11936)
    
    * set explicit warning flags on each stage
    * put a top-level flag for maxRowsInJoinReached b/c the UI depends on the top-level flag only
    * simplify the logic in early termination for HashJoinOperator per comments
    * set partial result flag in multi-stage handler
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  8 +++-
 .../MultiStageBrokerRequestHandler.java            |  3 ++
 .../apache/pinot/common/datatable/DataTable.java   |  5 ++-
 .../pinot/common/response/BrokerResponse.java      |  5 +++
 .../response/broker/BrokerResponseNative.java      | 21 +++++++--
 .../response/broker/BrokerResponseNativeV2.java    | 11 ++---
 .../response/broker/BrokerResponseStats.java       | 14 +++---
 .../query/reduce/ExecutionStatsAggregator.java     |  6 +++
 .../query/runtime/operator/AggregateOperator.java  |  8 ++--
 .../query/runtime/operator/HashJoinOperator.java   | 32 ++++++--------
 .../runtime/operator/AggregateOperatorTest.java    | 50 ++++++++++++++++++++++
 .../runtime/operator/HashJoinOperatorTest.java     | 16 +++++--
 12 files changed, 133 insertions(+), 46 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index eed9ae56d4..bcc5049ff0 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -775,8 +775,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
             1);
       }
-      brokerResponse.setPartialResult(
-          brokerResponse.isNumGroupsLimitReached() || brokerResponse.getExceptionsSize() > 0);
+      brokerResponse.setPartialResult(isPartialResult(brokerResponse));
 
       // Set total query processing time
       long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
@@ -1821,6 +1820,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       RequestContext requestContext)
       throws Exception;
 
+  protected static boolean isPartialResult(BrokerResponse brokerResponse) {
+    return brokerResponse.isNumGroupsLimitReached() || brokerResponse.isMaxRowsInJoinReached()
+        || brokerResponse.getExceptionsSize() > 0;
+  }
+
   protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
     statistics.setTotalDocs(response.getTotalDocs());
     statistics.setNumDocsScanned(response.getNumDocsScanned());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f3cbaffb1b..d329a99f69 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -238,6 +238,9 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
       brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
     }
 
+    // Set partial result flag
+    brokerResponse.setPartialResult(isPartialResult(brokerResponse));
+
     // Set total query processing time
     // TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 928834359f..72121611eb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -134,11 +134,12 @@ public interface DataTable {
     OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", MetadataValueType.LONG),
     OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
     OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", MetadataValueType.LONG),
-    OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG);
+    OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG),
+    MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", MetadataValueType.STRING);
 
     // We keep this constant to track the max id added so far for backward compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = 33;
+    private static final int MAX_ID = 34;
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 89fcc8c04b..06f65fc2aa 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -133,6 +133,11 @@ public interface BrokerResponse {
    */
   boolean isNumGroupsLimitReached();
 
+  /**
+   * Returns whether the limit for max rows in join has been reached.
+   */
+  boolean isMaxRowsInJoinReached();
+
   /**
    * Get number of exceptions recorded in the response.
    */
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 83887d0a3b..9fe098e26d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -44,10 +44,12 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
     "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
     "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
-    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
-    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
-    "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
-    "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo", "partialResult"})
+    "numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
+    "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+    "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
+    "segmentStatistics", "traceInfo", "partialResult"
+})
 public class BrokerResponseNative implements BrokerResponse {
   public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
   public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -76,6 +78,7 @@ public class BrokerResponseNative implements BrokerResponse {
 
   private long _totalDocs = 0L;
   private boolean _numGroupsLimitReached = false;
+  private boolean _maxRowsInJoinReached = false;
   private boolean _partialResult = false;
   private long _timeUsedMs = 0L;
   private long _offlineThreadCpuTimeNs = 0L;
@@ -494,6 +497,16 @@ public class BrokerResponseNative implements BrokerResponse {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  @JsonProperty("maxRowsInJoinReached")
+  public boolean isMaxRowsInJoinReached() {
+    return _maxRowsInJoinReached;
+  }
+
+  @JsonProperty("maxRowsInJoinReached")
+  public void setMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
+    _maxRowsInJoinReached = maxRowsInJoinReached;
+  }
+
   @JsonProperty("partialResult")
   public boolean isPartialResult() {
     return _partialResult;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 95943881dc..60a22460f3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -37,13 +37,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Supports serialization via JSON.
  */
 @JsonPropertyOrder({
-    "resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded",
+    "resultTable", "requestId", "stageStats", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
     "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
     "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
-    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
-    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
-    "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
-    "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
+    "numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
+    "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+    "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
+    "segmentStatistics", "traceInfo", "partialResult"
 })
 public class BrokerResponseNativeV2 extends BrokerResponseNative {
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index db23034efc..3acc9f3349 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,14 +33,16 @@ import org.apache.pinot.spi.utils.JsonUtils;
 //  same metadataKey
 // TODO: Replace member fields with a simple map of <MetadataKey, Object>
 // TODO: Add a subStat field, stage level subStats will contain each operator stats
-@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs",
-    "stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded",
-    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
-    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
-    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+@JsonPropertyOrder({
+    "brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
+    "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
+    "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+    "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+    "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
     "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
     "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
-    "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"})
+    "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"
+})
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class BrokerResponseStats extends BrokerResponseNative {
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index bc8612c551..c05644af5f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -74,6 +74,7 @@ public class ExecutionStatsAggregator {
   private long _explainPlanNumEmptyFilterSegments = 0L;
   private long _explainPlanNumMatchAllFilterSegments = 0L;
   private boolean _numGroupsLimitReached = false;
+  private boolean _maxRowsInJoinReached = false;
   private int _numBlocks = 0;
   private int _numRows = 0;
   private long _stageExecutionTimeMs = 0;
@@ -250,6 +251,8 @@ public class ExecutionStatsAggregator {
     _numGroupsLimitReached |=
         Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
 
+    _maxRowsInJoinReached |=
+        Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName()));
 
     String numBlocksString = metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName());
     if (numBlocksString != null) {
@@ -306,6 +309,7 @@ public class ExecutionStatsAggregator {
     brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
     brokerResponseNative.setTotalDocs(_numTotalDocs);
     brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+    brokerResponseNative.setMaxRowsInJoinReached(_maxRowsInJoinReached);
     brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
     brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
     brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
@@ -369,6 +373,8 @@ public class ExecutionStatsAggregator {
 
     brokerResponseStats.setNumBlocks(_numBlocks);
     brokerResponseStats.setNumRows(_numRows);
+    brokerResponseStats.setMaxRowsInJoinReached(_maxRowsInJoinReached);
+    brokerResponseStats.setNumGroupsLimitReached(_numGroupsLimitReached);
     brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
     brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
     brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 27b6cbdf08..5cb825c149 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
@@ -160,9 +160,9 @@ public class AggregateOperator extends MultiStageOperator {
       } else {
         TransferableBlock dataBlock = new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
         if (_groupByExecutor.isNumGroupsLimitReached()) {
-          dataBlock.addException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE,
-              String.format("Reached numGroupsLimit of: %d for group-by, ignoring the extra groups",
-                  _groupByExecutor.getNumGroupsLimit()));
+          OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+          operatorStats.recordSingleStat(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
+          _inputOperator.earlyTerminate();
         }
         return dataBlock;
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 295e0c49af..c508b373c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -110,7 +111,6 @@ public class HashJoinOperator extends MultiStageOperator {
   private final JoinOverFlowMode _joinOverflowMode;
 
   private int _currentRowsInHashTable = 0;
-  private ProcessingException _resourceLimitExceededException = null;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftTableOperator,
       MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) {
@@ -191,7 +191,7 @@ public class HashJoinOperator extends MultiStageOperator {
   protected TransferableBlock getNextBlock() {
     try {
       if (_isTerminated) {
-        return setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
       if (!_isHashTableBuilt) {
         // Build JOIN hash table
@@ -202,7 +202,7 @@ public class HashJoinOperator extends MultiStageOperator {
       }
       TransferableBlock leftBlock = _leftTableOperator.nextBlock();
       // JOIN each left block with the right block.
-      return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock));
+      return buildJoinedDataBlock(leftBlock);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
@@ -215,16 +215,21 @@ public class HashJoinOperator extends MultiStageOperator {
       List<Object[]> container = rightBlock.getContainer();
       // Row based overflow check.
       if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
-        _resourceLimitExceededException =
-            new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
-        _resourceLimitExceededException.setMessage(
-            "Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable);
         if (_joinOverflowMode == JoinOverFlowMode.THROW) {
-          throw _resourceLimitExceededException;
+          ProcessingException resourceLimitExceededException =
+              new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+          resourceLimitExceededException.setMessage(
+              "Cannot build in memory hash table for join operator, reach number of rows limit: "
+                  + _maxRowsInHashTable);
+          throw resourceLimitExceededException;
         } else {
           // Just fill up the buffer.
           int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
           container = container.subList(0, remainingRows);
+          OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+          operatorStats.recordSingleStat(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName(), "true");
+          // setting only the rightTableOperator to be early terminated and awaits EOS block next.
+          _rightTableOperator.earlyTerminate();
         }
       }
       // put all the rows into corresponding hash collections keyed by the key selector function.
@@ -238,10 +243,6 @@ public class HashJoinOperator extends MultiStageOperator {
         hashCollection.add(row);
       }
       _currentRowsInHashTable += container.size();
-      if (_currentRowsInHashTable == _maxRowsInHashTable) {
-        // setting only the rightTableOperator to be early terminated and awaits EOS block next.
-        _rightTableOperator.earlyTerminate();
-      }
       rightBlock = _rightTableOperator.nextBlock();
     }
     if (rightBlock.isErrorBlock()) {
@@ -300,13 +301,6 @@ public class HashJoinOperator extends MultiStageOperator {
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
-  private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock block) {
-    if (_resourceLimitExceededException != null) {
-      block.addException(_resourceLimitExceededException);
-    }
-    return block;
-  }
-
   private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
     List<Object[]> container = leftBlock.getContainer();
     List<Object[]> rows = new ArrayList<>(container.size());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0ad3276d26..93b65dad7f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -18,17 +18,25 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.AggregateNode.AggType;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -65,6 +73,12 @@ public class AggregateOperatorTest {
     _mocks.close();
   }
 
+  private static AbstractPlanNode.NodeHint getAggHints(Map<String, String> hintsMap) {
+    RelHint.Builder relHintBuilder = RelHint.builder(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
+    hintsMap.forEach(relHintBuilder::hintOption);
+    return new AbstractPlanNode.NodeHint(ImmutableList.of(relHintBuilder.build()));
+  }
+
   @Test
   public void shouldHandleUpstreamErrorBlocks() {
     // Given:
@@ -256,6 +270,42 @@ public class AggregateOperatorTest {
         "expected it to fail with class cast exception");
   }
 
+  @Test
+  public void shouldHandleGroupLimitExceed() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, DOUBLE});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1.0}, new Object[]{3, 2.0}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, 3.0}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
+    OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+    Map<String, String> hintsMap = ImmutableMap.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "1");
+    AggregateOperator operator =
+        new AggregateOperator(context, _input, outSchema, calls, group, AggType.DIRECT, Collections.singletonList(-1),
+            getAggHints(hintsMap));
+
+    // When:
+    TransferableBlock block1 = operator.nextBlock();
+    TransferableBlock block2 = operator.nextBlock();
+
+    // Then
+    Mockito.verify(_input).earlyTerminate();
+
+    // Then:
+    Assert.assertTrue(block1.getNumRows() == 1, "when group limit reach it should only return that many groups");
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "Second block is EOS (done processing)");
+    String operatorId =
+        Joiner.on("_").join(AggregateOperator.class.getSimpleName(), context.getStageId(), context.getServer());
+    OperatorStats operatorStats = context.getStats().getOperatorStats(context, operatorId);
+    Assert.assertEquals(operatorStats.getExecutionStats().get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()),
+        "true");
+  }
+
   private static RexExpression.FunctionCall getSum(RexExpression arg) {
     return new RexExpression.FunctionCall(SqlKind.SUM, ColumnDataType.INT, "SUM", ImmutableList.of(arg));
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 88d9cf4fe0..fa99ce0c01 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
@@ -29,6 +30,7 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -37,6 +39,7 @@ import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -642,15 +645,20 @@ public class HashJoinOperatorTest {
         PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1");
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
         getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, getJoinHints(hintsMap));
-    HashJoinOperator join =
-        new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node);
+
+    OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+    HashJoinOperator join = new HashJoinOperator(context, _leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     Mockito.verify(_rightOperator).earlyTerminate();
     Assert.assertFalse(result.isErrorBlock());
     Assert.assertEquals(result.getNumRows(), 1);
-    Assert.assertTrue(result.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
-        .contains("reach number of rows limit"));
+
+    String operatorId =
+        Joiner.on("_").join(HashJoinOperator.class.getSimpleName(), context.getStageId(), context.getServer());
+    OperatorStats operatorStats = context.getStats().getOperatorStats(context, operatorId);
+    Assert.assertEquals(
+        operatorStats.getExecutionStats().get(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName()), "true");
   }
 }
 // TODO: Add more inequi join tests.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org