You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/07/05 14:50:30 UTC

[pinot] branch master updated: specify how many segments were pruned by each server segment pruner (#8884)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new de16a0a35d specify how many segments were pruned by each server segment pruner (#8884)
de16a0a35d is described below

commit de16a0a35d93f3fe0393df563015e7dd1298ceb9
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Tue Jul 5 16:50:23 2022 +0200

    specify how many segments were pruned by each server segment pruner (#8884)
    
    * specify how many segments were pruned by each server segment pruner
    
    * force the query log when there are invalid segments
    
    * change header to javadoc
    
    * Add a warning section in Controller UI query view.
    
    Initially there is only one warning, which is shown when segments were pruned because they were invalid
    
    * Correct an assertion
    
    * Apply suggestions from code review
    
    Co-authored-by: Rong Rong <wa...@gmail.com>
    
    * Don't report empty segments as invalid
    
    * Add test on SegmentPrunerService
    
    * Update pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
    
    Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
    
    * Move new literals to the end of the enum list
    
    * Correctly apply suggested change
    
    * remove trailing whitespace
    
    * fix a compilation error
    
    Co-authored-by: Rong Rong <ro...@apache.org>
    Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   6 +-
 .../pinot/common/response/BrokerResponse.java      |  42 ++++++++
 .../response/broker/BrokerResponseNative.java      |  39 +++++++
 .../org/apache/pinot/common/utils/DataTable.java   |   3 +
 .../src/main/resources/app/pages/Query.tsx         |  21 ++++
 .../core/query/config/SegmentPrunerConfig.java     |   2 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  17 ++-
 .../pinot/core/query/pruner/SegmentPruner.java     |   2 +
 .../core/query/pruner/SegmentPrunerProvider.java   |   7 +-
 .../core/query/pruner/SegmentPrunerService.java    |  76 +++++++++++--
 ...entPruner.java => SegmentPrunerStatistics.java} |  55 +++++-----
 .../pinot/core/query/reduce/BaseReduceService.java |  23 +++-
 .../pinot/core/query/scheduler/QueryScheduler.java |  30 +++++-
 .../query/pruner/SegmentPrunerServiceTest.java     | 117 +++++++++++++++++++++
 14 files changed, 394 insertions(+), 46 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 0cbd12c537..776a4e1aae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -75,7 +75,11 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
   NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
-  NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
+  NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
+
+  NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedInvalid", false),
+  NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false),
+  NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),;
 
   private final String _meterName;
   private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index aaddaaa9cb..7e269f6421 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -241,6 +241,48 @@ public interface BrokerResponse {
    */
   void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);
 
+  /**
+   * Get the total number of segments pruned due to invalid data or schema.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  long getNumSegmentsPrunedInvalid();
+
+  /**
+   * Set the total number of segments pruned due to invalid data or schema.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);
+
+  /**
+   * Get the total number of segments pruned by applying the limit optimization.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  long getNumSegmentsPrunedByLimit();
+
+  /**
+   * Set the total number of segments pruned by applying the limit optimization.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);
+
+  /**
+   * Get the total number of segments pruned applying value optimizations, like bloom filters.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  long getNumSegmentsPrunedByValue();
+
+  /**
+   * Set the total number of segments pruned applying value optimizations, like bloom filters.
+   *
+   * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+   */
+  void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);
+
   /**
    * Get the total number of segments with an EmptyFilterOperator when Explain Plan is called
    */
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index a8fdd7a9d5..d9df0dc67e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -83,6 +83,9 @@ public class BrokerResponseNative implements BrokerResponse {
   private long _realtimeTotalCpuTimeNs = 0L;
   private long _numSegmentsPrunedByBroker = 0L;
   private long _numSegmentsPrunedByServer = 0L;
+  private long _numSegmentsPrunedInvalid = 0L;
+  private long _numSegmentsPrunedByLimit = 0L;
+  private long _numSegmentsPrunedByValue = 0L;
   private long _explainPlanNumEmptyFilterSegments = 0L;
   private long _explainPlanNumMatchAllFilterSegments = 0L;
   private int _numRowsResultSet = 0;
@@ -245,6 +248,42 @@ public class BrokerResponseNative implements BrokerResponse {
     _numSegmentsPrunedByServer = numSegmentsPrunedByServer;
   }
 
+  @JsonProperty("numSegmentsPrunedInvalid")
+  @Override
+  public long getNumSegmentsPrunedInvalid() {
+    return _numSegmentsPrunedInvalid;
+  }
+
+  @JsonProperty("numSegmentsPrunedInvalid")
+  @Override
+  public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
+    _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
+  }
+
+  @JsonProperty("numSegmentsPrunedByLimit")
+  @Override
+  public long getNumSegmentsPrunedByLimit() {
+    return _numSegmentsPrunedByLimit;
+  }
+
+  @JsonProperty("numSegmentsPrunedByLimit")
+  @Override
+  public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
+    _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
+  }
+
+  @JsonProperty("numSegmentsPrunedByValue")
+  @Override
+  public long getNumSegmentsPrunedByValue() {
+    return _numSegmentsPrunedByValue;
+  }
+
+  @JsonProperty("numSegmentsPrunedByValue")
+  @Override
+  public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
+    _numSegmentsPrunedByValue = numSegmentsPrunedByValue;
+  }
+
   @JsonProperty("explainPlanNumEmptyFilterSegments")
   @Override
   public long getExplainPlanNumEmptyFilterSegments() {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index c01af3ab11..c27565b0ec 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -116,6 +116,9 @@ public interface DataTable {
     SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG),
     RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG),
     NUM_SEGMENTS_PRUNED_BY_SERVER("numSegmentsPrunedByServer", MetadataValueType.INT),
+    NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedByInvalid", MetadataValueType.INT),
+    NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", MetadataValueType.INT),
+    NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", MetadataValueType.INT),
     EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS("explainPlanNumEmptyFilterSegments", MetadataValueType.INT),
     EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS("explainPlanNumMatchAllFilterSegments", MetadataValueType.INT);
 
diff --git a/pinot-controller/src/main/resources/app/pages/Query.tsx b/pinot-controller/src/main/resources/app/pages/Query.tsx
index 79747be1cf..99c691f8e9 100644
--- a/pinot-controller/src/main/resources/app/pages/Query.tsx
+++ b/pinot-controller/src/main/resources/app/pages/Query.tsx
@@ -190,6 +190,8 @@ const QueryPage = () => {
     records: [],
   });
 
+  const [warnings, setWarnings] = useState<Array<string>>([]);
+
   const [checked, setChecked] = React.useState({
     tracing: queryParam.get('tracing') === 'true',
     showResultJSON: false,
@@ -295,10 +297,22 @@ const QueryPage = () => {
     setResultData(results.result || { columns: [], records: [] });
     setQueryStats(results.queryStats || { columns: responseStatCols, records: [] });
     setOutputResult(JSON.stringify(results.data, null, 2) || '');
+    setWarnings(extractWarnings(results));
     setQueryLoader(false);
     queryExecuted.current = false;
   };
 
+  const extractWarnings = (result) => {
+    const warnings: Array<string> = [];
+    const numSegmentsPrunedInvalid = result.data.numSegmentsPrunedInvalid;
+    if (numSegmentsPrunedInvalid) {
+      warnings.push(`There are ${numSegmentsPrunedInvalid} invalid segment/s. This usually means that they were `
+         + `created with an older schema. `
+         + `Please reload the table in order to refresh these segments to the new schema.`);
+    }
+    return warnings;
+  }
+
   const fetchSQLData = async (tableName) => {
     setQueryLoader(true);
     const result = await PinotMethodUtils.getTableSchemaData(tableName);
@@ -517,6 +531,13 @@ const QueryPage = () => {
                   </Alert>
                 ) : (
                   <>
+                    {
+                    warnings.map(warn =>
+                        <Alert severity="warning" className={classes.sqlError}>
+                          {warn}
+                        </Alert>
+                      )
+                    }
                     <Grid item xs style={{ backgroundColor: 'white' }}>
                       {resultData.columns.length ? (
                         <>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
index 98c6c3ee0c..604bf10801 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  * Config for SegmentPruner.
  */
 public class SegmentPrunerConfig {
-  private static final String SEGMENT_PRUNER_NAMES_KEY = "class";
+  public static final String SEGMENT_PRUNER_NAMES_KEY = "class";
 
   private final int _numSegmentPruners;
   private final List<String> _segmentPrunerNames;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index e0c086f5c3..851e5f21b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -58,6 +58,7 @@ import org.apache.pinot.core.plan.maker.PlanMaker;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
@@ -289,7 +290,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
     TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
     int totalSegments = indexSegments.size();
-    List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext);
+    SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
+    List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
     segmentPruneTimer.stopAndRecord();
     int numSelectedSegments = selectedSegments.size();
     LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
@@ -310,6 +312,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
       metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
       metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(totalSegments));
+      addPrunerStats(metadata, prunerStats);
       return dataTable;
     } else {
       TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
@@ -322,12 +325,14 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       DataTable dataTable = queryContext.isExplain() ? processExplainPlanQueries(queryPlan) : queryPlan.execute();
       planExecTimer.stopAndRecord();
 
+      Map<String, String> metadata = dataTable.getMetadata();
       // Update the total docs in the metadata based on the un-pruned segments
-      dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs));
+      metadata.put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs));
 
       // Set the number of pruned segments. This count does not include the segments which returned empty filters
       int prunedSegments = totalSegments - numSelectedSegments;
-      dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments));
+      metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments));
+      addPrunerStats(metadata, prunerStats);
 
       return dataTable;
     }
@@ -575,4 +580,10 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       }
     }
   }
+
+  private void addPrunerStats(Map<String, String> metadata, SegmentPrunerStatistics prunerStats) {
+    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments()));
+    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), String.valueOf(prunerStats.getLimitPruned()));
+    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), String.valueOf(prunerStats.getValuePruned()));
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
index 2d06bd2df6..7953db71e5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
@@ -40,6 +40,8 @@ public interface SegmentPruner {
   /**
    * Prunes the segments based on the query, returns the segments that are not pruned.
    * <p>Override this method for the pruner logic.
+   *
+   * @param segments The list of segments to be pruned. Implementations must not modify the list.
    */
   List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
index dd392d15e7..e09d54590d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
@@ -35,9 +35,12 @@ public class SegmentPrunerProvider {
 
   private static final Map<String, Class<? extends SegmentPruner>> PRUNER_MAP = new HashMap<>();
 
+  public static final String COLUMN_VALUE_SEGMENT_PRUNER_NAME = "columnvaluesegmentpruner";
+  public static final String SELECTION_QUERY_SEGMENT_PRUNER_NAME = "selectionquerysegmentpruner";
+
   static {
-    PRUNER_MAP.put("columnvaluesegmentpruner", ColumnValueSegmentPruner.class);
-    PRUNER_MAP.put("selectionquerysegmentpruner", SelectionQuerySegmentPruner.class);
+    PRUNER_MAP.put(COLUMN_VALUE_SEGMENT_PRUNER_NAME, ColumnValueSegmentPruner.class);
+    PRUNER_MAP.put(SELECTION_QUERY_SEGMENT_PRUNER_NAME, SelectionQuerySegmentPruner.class);
   }
 
   @Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index a53191e4b7..5c79ca1c2d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -19,7 +19,10 @@
 package org.apache.pinot.core.query.pruner;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
 import org.apache.pinot.core.query.config.SegmentPrunerConfig;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -37,9 +40,11 @@ public class SegmentPrunerService {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPrunerService.class);
 
   private final List<SegmentPruner> _segmentPruners;
+  private final Map<SegmentPruner, BiConsumer<SegmentPrunerStatistics, Integer>> _prunerStatsUpdaters;
 
   public SegmentPrunerService(SegmentPrunerConfig config) {
     int numPruners = config.numSegmentPruners();
+    _prunerStatsUpdaters = new HashMap<>();
     _segmentPruners = new ArrayList<>(numPruners);
 
     for (int i = 0; i < numPruners; i++) {
@@ -49,25 +54,59 @@ public class SegmentPrunerService {
           config.getSegmentPrunerConfig(i));
       if (pruner != null) {
         _segmentPruners.add(pruner);
+        switch (segmentPrunerName.toLowerCase()) {
+          case SegmentPrunerProvider.SELECTION_QUERY_SEGMENT_PRUNER_NAME:
+            _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setLimitPruned);
+            break;
+          case SegmentPrunerProvider.COLUMN_VALUE_SEGMENT_PRUNER_NAME:
+            _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setValuePruned);
+            break;
+          default:
+            _prunerStatsUpdaters.put(pruner, (stats, value) -> { });
+            break;
+        }
       } else {
         LOGGER.warn("could not create segment pruner: {}", segmentPrunerName);
       }
     }
+    assert _segmentPruners.stream()
+        .allMatch(_prunerStatsUpdaters::containsKey)
+        : "No defined stats updater for pruner " + _segmentPruners.stream()
+        .filter(p -> !_prunerStatsUpdaters.containsKey(p))
+        .findAny().orElseThrow(IllegalStateException::new);
   }
 
   /**
    * Prunes the segments based on the query request, returns the segments that are not pruned.
+   *
+   * @deprecated this method is here for compatibility reasons and may be removed soon.
+   * Call {@link #prune(List, QueryContext, SegmentPrunerStatistics)} instead
+   * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+   *                 undefined way. Therefore, this list should not be used after calling this method.
    */
+  @Deprecated
   public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) {
+    return prune(segments, query, new SegmentPrunerStatistics());
+  }
+
+  /**
+   * Prunes the segments based on the query request, returns the segments that are not pruned.
+   *
+   * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+   *                 undefined way. Therefore, this list should not be used after calling this method.
+   */
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, SegmentPrunerStatistics stats) {
     try (InvocationScope scope = Tracing.getTracer().createScope(SegmentPrunerService.class)) {
-      segments = removeInvalidSegments(segments, query);
+      segments = removeInvalidSegments(segments, query, stats);
       int invokedPrunersCount = 0;
       for (SegmentPruner segmentPruner : _segmentPruners) {
         if (segmentPruner.isApplicableTo(query)) {
           invokedPrunersCount++;
           try (InvocationScope prunerScope = Tracing.getTracer().createScope(segmentPruner.getClass())) {
-            prunerScope.setNumSegments(segments.size());
+            int originalSegmentsSize = segments.size();
+            prunerScope.setNumSegments(originalSegmentsSize);
             segments = segmentPruner.prune(segments, query);
+            _prunerStatsUpdaters.get(segmentPruner).accept(stats, originalSegmentsSize - segments.size());
           }
         }
       }
@@ -76,18 +115,41 @@ public class SegmentPrunerService {
     return segments;
   }
 
-  private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query) {
+  /**
+   * Filters the given list, returning a list that only contains the valid segments, modifying the list received as
+   * argument.
+   *
+   * <p>
+   * This is a destructive operation. The list received as arguments may be modified, so only the returned list should
+   * be used.
+   * </p>
+   *
+   * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+   *                 undefined way. Therefore, this list should not be used after calling this method.
+   * @return the new list with filtered elements. This is the list that have to be used.
+   */
+  private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query,
+      SegmentPrunerStatistics stats) {
     int selected = 0;
+    int invalid = 0;
     for (IndexSegment segment : segments) {
-      if (!isInvalidSegment(segment, query)) {
-        segments.set(selected++, segment);
+      if (!isEmptySegment(segment)) {
+        if (isInvalidSegment(segment, query)) {
+          invalid++;
+        } else {
+          segments.set(selected++, segment);
+        }
       }
     }
+    stats.setInvalidSegments(invalid);
     return segments.subList(0, selected);
   }
 
+  private static boolean isEmptySegment(IndexSegment segment) {
+    return segment.getSegmentMetadata().getTotalDocs() == 0;
+  }
+
   private static boolean isInvalidSegment(IndexSegment segment, QueryContext query) {
-    return segment.getSegmentMetadata().getTotalDocs() == 0
-        || !segment.getColumnNames().containsAll(query.getColumns());
+    return !segment.getColumnNames().containsAll(query.getColumns());
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
similarity index 54%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
index 2d06bd2df6..39d068cf80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
@@ -18,28 +18,35 @@
  */
 package org.apache.pinot.core.query.pruner;
 
-import java.util.List;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-
-public interface SegmentPruner {
-
-  /**
-   * Initializes the segment pruner.
-   */
-  void init(PinotConfiguration config);
-
-  /**
-   * Inspect the query context to determine if the pruner should be applied
-   * @return true if the pruner applies to the query
-   */
-  boolean isApplicableTo(QueryContext query);
-
-  /**
-   * Prunes the segments based on the query, returns the segments that are not pruned.
-   * <p>Override this method for the pruner logic.
-   */
-  List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
+public class SegmentPrunerStatistics {
+
+  private int _invalidSegments;
+
+  private int _valuePruned;
+
+  private int _limitPruned;
+
+  public int getInvalidSegments() {
+    return _invalidSegments;
+  }
+
+  public void setInvalidSegments(int invalidSegments) {
+    _invalidSegments = invalidSegments;
+  }
+
+  public int getValuePruned() {
+    return _valuePruned;
+  }
+
+  public void setValuePruned(int valuePruned) {
+    _valuePruned = valuePruned;
+  }
+
+  public int getLimitPruned() {
+    return _limitPruned;
+  }
+
+  public void setLimitPruned(int limitPruned) {
+    _limitPruned = limitPruned;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 662c08cabc..dc3a450d50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -143,6 +144,9 @@ public abstract class BaseReduceService {
     private long _offlineTotalCpuTimeNs = 0L;
     private long _realtimeTotalCpuTimeNs = 0L;
     private long _numSegmentsPrunedByServer = 0L;
+    private long _numSegmentsPrunedInvalid = 0L;
+    private long _numSegmentsPrunedByLimit = 0L;
+    private long _numSegmentsPrunedByValue = 0L;
     private long _explainPlanNumEmptyFilterSegments = 0L;
     private long _explainPlanNumMatchAllFilterSegments = 0L;
     private boolean _numGroupsLimitReached = false;
@@ -233,10 +237,11 @@ public abstract class BaseReduceService {
       _realtimeTotalCpuTimeNs =
           _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
 
-      String numSegmentsPrunedByServer = metadata.get(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName());
-      if (numSegmentsPrunedByServer != null) {
-        _numSegmentsPrunedByServer += Long.parseLong(numSegmentsPrunedByServer);
-      }
+      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+          l -> _numSegmentsPrunedByServer += l);
+      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, l -> _numSegmentsPrunedInvalid += l);
+      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, l -> _numSegmentsPrunedByLimit += l);
+      withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, l -> _numSegmentsPrunedByValue += l);
 
       String explainPlanNumEmptyFilterSegments =
           metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
@@ -286,6 +291,9 @@ public abstract class BaseReduceService {
       brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
       brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
       brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
+      brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
+      brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
+      brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
       brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
       brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
       if (_numConsumingSegmentsProcessed > 0) {
@@ -324,5 +332,12 @@ public abstract class BaseReduceService {
         }
       }
     }
+
+    private void withNotNullLongMetadata(Map<String, String> metadata, MetadataKey key, LongConsumer consumer) {
+      String strValue = metadata.get(key.getName());
+      if (strValue != null) {
+        consumer.accept(Long.parseLong(strValue));
+      }
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 7ca4485ca4..e4344dd1bb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -175,6 +175,15 @@ public abstract class QueryScheduler {
         dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsMatched = Long.parseLong(
         dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
+    long numSegmentsPrunedInvalid = Long.parseLong(
+        dataTableMetadata.getOrDefault(
+            MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
+    long numSegmentsPrunedByLimit = Long.parseLong(
+        dataTableMetadata.getOrDefault(
+            MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
+    long numSegmentsPrunedByValue = Long.parseLong(
+        dataTableMetadata.getOrDefault(
+            MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsConsuming = Long.parseLong(
         dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
     long minConsumingFreshnessMs = Long.parseLong(
@@ -231,12 +240,14 @@ public abstract class QueryScheduler {
 
     // Please keep the format as name=value comma-separated with no spaces
     // Please add new entries at the end
-    if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
-      LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+    if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
+      LOGGER.info("Processed requestId={},table={},"
+              + "segments(queried/processed/matched/consuming/invalid/limit/value)={}/{}/{}/{}/{}/{}/{},"
               + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},"
               + "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
               + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
-          numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs,
+          numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
+          numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
           timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
           timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
@@ -266,6 +277,12 @@ public abstract class QueryScheduler {
     _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
     _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
     _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
+    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
+        numSegmentsPrunedInvalid);
+    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+        numSegmentsPrunedByLimit);
+    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
+        numSegmentsPrunedByValue);
 
     return responseBytes;
   }
@@ -276,12 +293,17 @@ public abstract class QueryScheduler {
    * TODO: come up with other criteria for forcing a log and come up with better numbers
    *
    */
-  private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+  private boolean forceLog(long schedulerWaitMs, long numDocsScanned, long numSegmentsPrunedInvalid) {
     // If scheduler wait time is larger than 100ms, force the log
     if (schedulerWaitMs > 100L) {
       return true;
     }
 
+    // If there are invalid segments, force the log
+    if (numSegmentsPrunedInvalid > 0) {
+      return true;
+    }
+
     // If the number of document scanned is larger than 1 million rows, force the log
     return numDocsScanned > 1_000_000L;
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
new file mode 100644
index 0000000000..334cfc1d3c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SegmentPrunerServiceTest {
+  private final SegmentPrunerConfig _emptyPrunerConf;
+
+  public SegmentPrunerServiceTest() {
+    PinotConfiguration pinotConf = new PinotConfiguration();
+    pinotConf.setProperty(SegmentPrunerConfig.SEGMENT_PRUNER_NAMES_KEY, "[]");
+    _emptyPrunerConf = new SegmentPrunerConfig(pinotConf);
+  }
+
+  @Test
+  public void notEmptyValidSegmentsAreNotPruned() {
+    SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+    IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2");
+
+    SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+    List<IndexSegment> indexes = new ArrayList<>();
+    indexes.add(indexSegment);
+
+    String query = "select col1 from t1";
+
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+    List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+    Assert.assertEquals(actual, indexes);
+    Assert.assertEquals(stats.getInvalidSegments(), 0);
+  }
+
+  @Test
+  public void emptySegmentsAreNotInvalid() {
+    SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+    IndexSegment indexSegment = mockIndexSegment(0, "col1", "col2");
+
+    SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+    List<IndexSegment> indexes = new ArrayList<>();
+    indexes.add(indexSegment);
+
+    String query = "select col1 from t1";
+
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+    List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+    Assert.assertEquals(actual, Collections.emptyList());
+    Assert.assertEquals(stats.getInvalidSegments(), 0);
+  }
+
+  @Test
+  public void segmentsWithoutColumnAreInvalid() {
+    SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+    IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2");
+
+    SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+    List<IndexSegment> indexes = new ArrayList<>();
+    indexes.add(indexSegment);
+
+    String query = "select not_present from t1";
+
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+    List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+    Assert.assertEquals(actual, Collections.emptyList());
+    Assert.assertEquals(1, stats.getInvalidSegments());
+  }
+
+  private IndexSegment mockIndexSegment(int totalDocs, String... columns) {
+    IndexSegment indexSegment = mock(IndexSegment.class);
+    when(indexSegment.getColumnNames()).thenReturn(new HashSet<>(Arrays.asList(columns)));
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+    when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    return indexSegment;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org