You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/07/05 14:50:30 UTC
[pinot] branch master updated: specify how many segments were pruned by each server segment pruner (#8884)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new de16a0a35d specify how many segments were pruned by each server segment pruner (#8884)
de16a0a35d is described below
commit de16a0a35d93f3fe0393df563015e7dd1298ceb9
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Tue Jul 5 16:50:23 2022 +0200
specify how many segments were pruned by each server segment pruner (#8884)
* specify how many segments were pruned by each server segment pruner
* force the query log when there are invalid segments
* change header to javadoc
* Add a warning section in Controller UI query view.
Initially there is only one warning, which is shown when segments were pruned because they were invalid
* Correct an assertion
* Apply suggestions from code review
Co-authored-by: Rong Rong <wa...@gmail.com>
* Don't report empty segments as invalid
* Add test on SegmentPrunerService
* Update pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
* Move new literals to the end of the enum list
* Correctly apply suggested change
* remove trailing whitespace
* fix a compilation error
Co-authored-by: Rong Rong <ro...@apache.org>
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
.../apache/pinot/common/metrics/ServerMeter.java | 6 +-
.../pinot/common/response/BrokerResponse.java | 42 ++++++++
.../response/broker/BrokerResponseNative.java | 39 +++++++
.../org/apache/pinot/common/utils/DataTable.java | 3 +
.../src/main/resources/app/pages/Query.tsx | 21 ++++
.../core/query/config/SegmentPrunerConfig.java | 2 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 17 ++-
.../pinot/core/query/pruner/SegmentPruner.java | 2 +
.../core/query/pruner/SegmentPrunerProvider.java | 7 +-
.../core/query/pruner/SegmentPrunerService.java | 76 +++++++++++--
...entPruner.java => SegmentPrunerStatistics.java} | 55 +++++-----
.../pinot/core/query/reduce/BaseReduceService.java | 23 +++-
.../pinot/core/query/scheduler/QueryScheduler.java | 30 +++++-
.../query/pruner/SegmentPrunerServiceTest.java | 117 +++++++++++++++++++++
14 files changed, 394 insertions(+), 46 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 0cbd12c537..776a4e1aae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -75,7 +75,11 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// Netty connection metrics
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
- NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
+ NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
+
+ NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedInvalid", false),
+ NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false),
+ NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),;
private final String _meterName;
private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index aaddaaa9cb..7e269f6421 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -241,6 +241,48 @@ public interface BrokerResponse {
*/
void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);
+ /**
+ * Get the total number of segments pruned due to invalid data or schema.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ long getNumSegmentsPrunedInvalid();
+
+ /**
+ * Set the total number of segments pruned due to invalid data or schema.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);
+
+ /**
+ * Get the total number of segments pruned by applying the limit optimization.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ long getNumSegmentsPrunedByLimit();
+
+ /**
+ * Set the total number of segments pruned by applying the limit optimization.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);
+
+ /**
+ * Get the total number of segments pruned applying value optimizations, like bloom filters.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ long getNumSegmentsPrunedByValue();
+
+ /**
+ * Set the total number of segments pruned applying value optimizations, like bloom filters.
+ *
+ * This value is always lower or equal than {@link #getNumSegmentsPrunedByServer()}
+ */
+ void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);
+
/**
* Get the total number of segments with an EmptyFilterOperator when Explain Plan is called
*/
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index a8fdd7a9d5..d9df0dc67e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -83,6 +83,9 @@ public class BrokerResponseNative implements BrokerResponse {
private long _realtimeTotalCpuTimeNs = 0L;
private long _numSegmentsPrunedByBroker = 0L;
private long _numSegmentsPrunedByServer = 0L;
+ private long _numSegmentsPrunedInvalid = 0L;
+ private long _numSegmentsPrunedByLimit = 0L;
+ private long _numSegmentsPrunedByValue = 0L;
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private int _numRowsResultSet = 0;
@@ -245,6 +248,42 @@ public class BrokerResponseNative implements BrokerResponse {
_numSegmentsPrunedByServer = numSegmentsPrunedByServer;
}
+ @JsonProperty("numSegmentsPrunedInvalid")
+ @Override
+ public long getNumSegmentsPrunedInvalid() {
+ return _numSegmentsPrunedInvalid;
+ }
+
+ @JsonProperty("numSegmentsPrunedInvalid")
+ @Override
+ public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
+ _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
+ }
+
+ @JsonProperty("numSegmentsPrunedByLimit")
+ @Override
+ public long getNumSegmentsPrunedByLimit() {
+ return _numSegmentsPrunedByLimit;
+ }
+
+ @JsonProperty("numSegmentsPrunedByLimit")
+ @Override
+ public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
+ _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
+ }
+
+ @JsonProperty("numSegmentsPrunedByValue")
+ @Override
+ public long getNumSegmentsPrunedByValue() {
+ return _numSegmentsPrunedByValue;
+ }
+
+ @JsonProperty("numSegmentsPrunedByValue")
+ @Override
+ public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
+ _numSegmentsPrunedByValue = numSegmentsPrunedByValue;
+ }
+
@JsonProperty("explainPlanNumEmptyFilterSegments")
@Override
public long getExplainPlanNumEmptyFilterSegments() {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index c01af3ab11..c27565b0ec 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -116,6 +116,9 @@ public interface DataTable {
SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG),
RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG),
NUM_SEGMENTS_PRUNED_BY_SERVER("numSegmentsPrunedByServer", MetadataValueType.INT),
+ NUM_SEGMENTS_PRUNED_INVALID("numSegmentsPrunedByInvalid", MetadataValueType.INT),
+ NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", MetadataValueType.INT),
+ NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", MetadataValueType.INT),
EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS("explainPlanNumEmptyFilterSegments", MetadataValueType.INT),
EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS("explainPlanNumMatchAllFilterSegments", MetadataValueType.INT);
diff --git a/pinot-controller/src/main/resources/app/pages/Query.tsx b/pinot-controller/src/main/resources/app/pages/Query.tsx
index 79747be1cf..99c691f8e9 100644
--- a/pinot-controller/src/main/resources/app/pages/Query.tsx
+++ b/pinot-controller/src/main/resources/app/pages/Query.tsx
@@ -190,6 +190,8 @@ const QueryPage = () => {
records: [],
});
+ const [warnings, setWarnings] = useState<Array<string>>([]);
+
const [checked, setChecked] = React.useState({
tracing: queryParam.get('tracing') === 'true',
showResultJSON: false,
@@ -295,10 +297,22 @@ const QueryPage = () => {
setResultData(results.result || { columns: [], records: [] });
setQueryStats(results.queryStats || { columns: responseStatCols, records: [] });
setOutputResult(JSON.stringify(results.data, null, 2) || '');
+ setWarnings(extractWarnings(results));
setQueryLoader(false);
queryExecuted.current = false;
};
+ const extractWarnings = (result) => {
+ const warnings: Array<string> = [];
+ const numSegmentsPrunedInvalid = result.data.numSegmentsPrunedInvalid;
+ if (numSegmentsPrunedInvalid) {
+ warnings.push(`There are ${numSegmentsPrunedInvalid} invalid segment/s. This usually means that they were `
+ + `created with an older schema. `
+ + `Please reload the table in order to refresh these segments to the new schema.`);
+ }
+ return warnings;
+ }
+
const fetchSQLData = async (tableName) => {
setQueryLoader(true);
const result = await PinotMethodUtils.getTableSchemaData(tableName);
@@ -517,6 +531,13 @@ const QueryPage = () => {
</Alert>
) : (
<>
+ {
+ warnings.map(warn =>
+ <Alert severity="warning" className={classes.sqlError}>
+ {warn}
+ </Alert>
+ )
+ }
<Grid item xs style={{ backgroundColor: 'white' }}>
{resultData.columns.length ? (
<>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
index 98c6c3ee0c..604bf10801 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/config/SegmentPrunerConfig.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
* Config for SegmentPruner.
*/
public class SegmentPrunerConfig {
- private static final String SEGMENT_PRUNER_NAMES_KEY = "class";
+ public static final String SEGMENT_PRUNER_NAMES_KEY = "class";
private final int _numSegmentPruners;
private final List<String> _segmentPrunerNames;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index e0c086f5c3..851e5f21b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -58,6 +58,7 @@ import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
@@ -289,7 +290,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
int totalSegments = indexSegments.size();
- List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext);
+ SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
+ List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
segmentPruneTimer.stopAndRecord();
int numSelectedSegments = selectedSegments.size();
LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
@@ -310,6 +312,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(totalSegments));
+ addPrunerStats(metadata, prunerStats);
return dataTable;
} else {
TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
@@ -322,12 +325,14 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
DataTable dataTable = queryContext.isExplain() ? processExplainPlanQueries(queryPlan) : queryPlan.execute();
planExecTimer.stopAndRecord();
+ Map<String, String> metadata = dataTable.getMetadata();
// Update the total docs in the metadata based on the un-pruned segments
- dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs));
+ metadata.put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs));
// Set the number of pruned segments. This count does not include the segments which returned empty filters
int prunedSegments = totalSegments - numSelectedSegments;
- dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments));
+ metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments));
+ addPrunerStats(metadata, prunerStats);
return dataTable;
}
@@ -575,4 +580,10 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
}
}
+
+ private void addPrunerStats(Map<String, String> metadata, SegmentPrunerStatistics prunerStats) {
+ metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments()));
+ metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), String.valueOf(prunerStats.getLimitPruned()));
+ metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), String.valueOf(prunerStats.getValuePruned()));
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
index 2d06bd2df6..7953db71e5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
@@ -40,6 +40,8 @@ public interface SegmentPruner {
/**
* Prunes the segments based on the query, returns the segments that are not pruned.
* <p>Override this method for the pruner logic.
+ *
+ * @param segments The list of segments to be pruned. Implementations must not modify the list.
*/
List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
index dd392d15e7..e09d54590d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerProvider.java
@@ -35,9 +35,12 @@ public class SegmentPrunerProvider {
private static final Map<String, Class<? extends SegmentPruner>> PRUNER_MAP = new HashMap<>();
+ public static final String COLUMN_VALUE_SEGMENT_PRUNER_NAME = "columnvaluesegmentpruner";
+ public static final String SELECTION_QUERY_SEGMENT_PRUNER_NAME = "selectionquerysegmentpruner";
+
static {
- PRUNER_MAP.put("columnvaluesegmentpruner", ColumnValueSegmentPruner.class);
- PRUNER_MAP.put("selectionquerysegmentpruner", SelectionQuerySegmentPruner.class);
+ PRUNER_MAP.put(COLUMN_VALUE_SEGMENT_PRUNER_NAME, ColumnValueSegmentPruner.class);
+ PRUNER_MAP.put(SELECTION_QUERY_SEGMENT_PRUNER_NAME, SelectionQuerySegmentPruner.class);
}
@Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index a53191e4b7..5c79ca1c2d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -19,7 +19,10 @@
package org.apache.pinot.core.query.pruner;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
import org.apache.pinot.core.query.config.SegmentPrunerConfig;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -37,9 +40,11 @@ public class SegmentPrunerService {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPrunerService.class);
private final List<SegmentPruner> _segmentPruners;
+ private final Map<SegmentPruner, BiConsumer<SegmentPrunerStatistics, Integer>> _prunerStatsUpdaters;
public SegmentPrunerService(SegmentPrunerConfig config) {
int numPruners = config.numSegmentPruners();
+ _prunerStatsUpdaters = new HashMap<>();
_segmentPruners = new ArrayList<>(numPruners);
for (int i = 0; i < numPruners; i++) {
@@ -49,25 +54,59 @@ public class SegmentPrunerService {
config.getSegmentPrunerConfig(i));
if (pruner != null) {
_segmentPruners.add(pruner);
+ switch (segmentPrunerName.toLowerCase()) {
+ case SegmentPrunerProvider.SELECTION_QUERY_SEGMENT_PRUNER_NAME:
+ _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setLimitPruned);
+ break;
+ case SegmentPrunerProvider.COLUMN_VALUE_SEGMENT_PRUNER_NAME:
+ _prunerStatsUpdaters.put(pruner, SegmentPrunerStatistics::setValuePruned);
+ break;
+ default:
+ _prunerStatsUpdaters.put(pruner, (stats, value) -> { });
+ break;
+ }
} else {
LOGGER.warn("could not create segment pruner: {}", segmentPrunerName);
}
}
+ assert _segmentPruners.stream()
+ .allMatch(_prunerStatsUpdaters::containsKey)
+ : "No defined stats updater for pruner " + _segmentPruners.stream()
+ .filter(p -> !_prunerStatsUpdaters.containsKey(p))
+ .findAny().orElseThrow(IllegalStateException::new);
}
/**
* Prunes the segments based on the query request, returns the segments that are not pruned.
+ *
+ * @deprecated this method is here for compatibility reasons and may be removed soon.
+ * Call {@link #prune(List, QueryContext, SegmentPrunerStatistics)} instead
+ * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+ * undefined way. Therefore, this list should not be used after calling this method.
*/
+ @Deprecated
public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) {
+ return prune(segments, query, new SegmentPrunerStatistics());
+ }
+
+ /**
+ * Prunes the segments based on the query request, returns the segments that are not pruned.
+ *
+ * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+ * undefined way. Therefore, this list should not be used after calling this method.
+ */
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, SegmentPrunerStatistics stats) {
try (InvocationScope scope = Tracing.getTracer().createScope(SegmentPrunerService.class)) {
- segments = removeInvalidSegments(segments, query);
+ segments = removeInvalidSegments(segments, query, stats);
int invokedPrunersCount = 0;
for (SegmentPruner segmentPruner : _segmentPruners) {
if (segmentPruner.isApplicableTo(query)) {
invokedPrunersCount++;
try (InvocationScope prunerScope = Tracing.getTracer().createScope(segmentPruner.getClass())) {
- prunerScope.setNumSegments(segments.size());
+ int originalSegmentsSize = segments.size();
+ prunerScope.setNumSegments(originalSegmentsSize);
segments = segmentPruner.prune(segments, query);
+ _prunerStatsUpdaters.get(segmentPruner).accept(stats, originalSegmentsSize - segments.size());
}
}
}
@@ -76,18 +115,41 @@ public class SegmentPrunerService {
return segments;
}
- private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query) {
+ /**
+ * Filters the given list, returning a list that only contains the valid segments, modifying the list received as
+ * argument.
+ *
+ * <p>
+ * This is a destructive operation. The list received as arguments may be modified, so only the returned list should
+ * be used.
+ * </p>
+ *
+ * @param segments the list of segments to be pruned. This is a destructive operation that may modify this list in an
+ * undefined way. Therefore, this list should not be used after calling this method.
+ * @return the new list with filtered elements. This is the list that have to be used.
+ */
+ private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query,
+ SegmentPrunerStatistics stats) {
int selected = 0;
+ int invalid = 0;
for (IndexSegment segment : segments) {
- if (!isInvalidSegment(segment, query)) {
- segments.set(selected++, segment);
+ if (!isEmptySegment(segment)) {
+ if (isInvalidSegment(segment, query)) {
+ invalid++;
+ } else {
+ segments.set(selected++, segment);
+ }
}
}
+ stats.setInvalidSegments(invalid);
return segments.subList(0, selected);
}
+ private static boolean isEmptySegment(IndexSegment segment) {
+ return segment.getSegmentMetadata().getTotalDocs() == 0;
+ }
+
private static boolean isInvalidSegment(IndexSegment segment, QueryContext query) {
- return segment.getSegmentMetadata().getTotalDocs() == 0
- || !segment.getColumnNames().containsAll(query.getColumns());
+ return !segment.getColumnNames().containsAll(query.getColumns());
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
similarity index 54%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
index 2d06bd2df6..39d068cf80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerStatistics.java
@@ -18,28 +18,35 @@
*/
package org.apache.pinot.core.query.pruner;
-import java.util.List;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-
-public interface SegmentPruner {
-
- /**
- * Initializes the segment pruner.
- */
- void init(PinotConfiguration config);
-
- /**
- * Inspect the query context to determine if the pruner should be applied
- * @return true if the pruner applies to the query
- */
- boolean isApplicableTo(QueryContext query);
-
- /**
- * Prunes the segments based on the query, returns the segments that are not pruned.
- * <p>Override this method for the pruner logic.
- */
- List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query);
+public class SegmentPrunerStatistics {
+
+ private int _invalidSegments;
+
+ private int _valuePruned;
+
+ private int _limitPruned;
+
+ public int getInvalidSegments() {
+ return _invalidSegments;
+ }
+
+ public void setInvalidSegments(int invalidSegments) {
+ _invalidSegments = invalidSegments;
+ }
+
+ public int getValuePruned() {
+ return _valuePruned;
+ }
+
+ public void setValuePruned(int valuePruned) {
+ _valuePruned = valuePruned;
+ }
+
+ public int getLimitPruned() {
+ return _limitPruned;
+ }
+
+ public void setLimitPruned(int limitPruned) {
+ _limitPruned = limitPruned;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 662c08cabc..dc3a450d50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -143,6 +144,9 @@ public abstract class BaseReduceService {
private long _offlineTotalCpuTimeNs = 0L;
private long _realtimeTotalCpuTimeNs = 0L;
private long _numSegmentsPrunedByServer = 0L;
+ private long _numSegmentsPrunedInvalid = 0L;
+ private long _numSegmentsPrunedByLimit = 0L;
+ private long _numSegmentsPrunedByValue = 0L;
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private boolean _numGroupsLimitReached = false;
@@ -233,10 +237,11 @@ public abstract class BaseReduceService {
_realtimeTotalCpuTimeNs =
_realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
- String numSegmentsPrunedByServer = metadata.get(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName());
- if (numSegmentsPrunedByServer != null) {
- _numSegmentsPrunedByServer += Long.parseLong(numSegmentsPrunedByServer);
- }
+ withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+ l -> _numSegmentsPrunedByServer += l);
+ withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, l -> _numSegmentsPrunedInvalid += l);
+ withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, l -> _numSegmentsPrunedByLimit += l);
+ withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, l -> _numSegmentsPrunedByValue += l);
String explainPlanNumEmptyFilterSegments =
metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
@@ -286,6 +291,9 @@ public abstract class BaseReduceService {
brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
+ brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
+ brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
+ brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
if (_numConsumingSegmentsProcessed > 0) {
@@ -324,5 +332,12 @@ public abstract class BaseReduceService {
}
}
}
+
+ private void withNotNullLongMetadata(Map<String, String> metadata, MetadataKey key, LongConsumer consumer) {
+ String strValue = metadata.get(key.getName());
+ if (strValue != null) {
+ consumer.accept(Long.parseLong(strValue));
+ }
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 7ca4485ca4..e4344dd1bb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -175,6 +175,15 @@ public abstract class QueryScheduler {
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsMatched = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
+ long numSegmentsPrunedInvalid = Long.parseLong(
+ dataTableMetadata.getOrDefault(
+ MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
+ long numSegmentsPrunedByLimit = Long.parseLong(
+ dataTableMetadata.getOrDefault(
+ MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
+ long numSegmentsPrunedByValue = Long.parseLong(
+ dataTableMetadata.getOrDefault(
+ MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsConsuming = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs = Long.parseLong(
@@ -231,12 +240,14 @@ public abstract class QueryScheduler {
// Please keep the format as name=value comma-separated with no spaces
// Please add new entries at the end
- if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
- LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+ if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
+ LOGGER.info("Processed requestId={},table={},"
+ + "segments(queried/processed/matched/consuming/invalid/limit/value)={}/{}/{}/{}/{}/{}/{},"
+ "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},"
+ "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+ "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
- numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs,
+ numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
+ numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
@@ -266,6 +277,12 @@ public abstract class QueryScheduler {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
+ _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
+ numSegmentsPrunedInvalid);
+ _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+ numSegmentsPrunedByLimit);
+ _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
+ numSegmentsPrunedByValue);
return responseBytes;
}
@@ -276,12 +293,17 @@ public abstract class QueryScheduler {
* TODO: come up with other criteria for forcing a log and come up with better numbers
*
*/
- private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+ private boolean forceLog(long schedulerWaitMs, long numDocsScanned, long numSegmentsPrunedInvalid) {
// If scheduler wait time is larger than 100ms, force the log
if (schedulerWaitMs > 100L) {
return true;
}
+ // If there are invalid segments, force the log
+ if (numSegmentsPrunedInvalid > 0) {
+ return true;
+ }
+
// If the number of document scanned is larger than 1 million rows, force the log
return numDocsScanned > 1_000_000L;
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
new file mode 100644
index 0000000000..334cfc1d3c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.pruner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.pinot.core.query.config.SegmentPrunerConfig;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class SegmentPrunerServiceTest {
+ private final SegmentPrunerConfig _emptyPrunerConf;
+
+ public SegmentPrunerServiceTest() {
+ PinotConfiguration pinotConf = new PinotConfiguration();
+ pinotConf.setProperty(SegmentPrunerConfig.SEGMENT_PRUNER_NAMES_KEY, "[]");
+ _emptyPrunerConf = new SegmentPrunerConfig(pinotConf);
+ }
+
+ @Test
+ public void notEmptyValidSegmentsAreNotPruned() {
+ SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+ IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2");
+
+ SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+ List<IndexSegment> indexes = new ArrayList<>();
+ indexes.add(indexSegment);
+
+ String query = "select col1 from t1";
+
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+ List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+ Assert.assertEquals(actual, indexes);
+ Assert.assertEquals(stats.getInvalidSegments(), 0);
+ }
+
+ @Test
+ public void emptySegmentsAreNotInvalid() {
+ SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+ IndexSegment indexSegment = mockIndexSegment(0, "col1", "col2");
+
+ SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+ List<IndexSegment> indexes = new ArrayList<>();
+ indexes.add(indexSegment);
+
+ String query = "select col1 from t1";
+
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+ List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+ Assert.assertEquals(actual, Collections.emptyList());
+ Assert.assertEquals(stats.getInvalidSegments(), 0);
+ }
+
+ @Test
+ public void segmentsWithoutColumnAreInvalid() {
+ SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
+ IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2");
+
+ SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
+
+ List<IndexSegment> indexes = new ArrayList<>();
+ indexes.add(indexSegment);
+
+ String query = "select not_present from t1";
+
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
+
+ List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
+
+ Assert.assertEquals(actual, Collections.emptyList());
+ Assert.assertEquals(1, stats.getInvalidSegments());
+ }
+
+ private IndexSegment mockIndexSegment(int totalDocs, String... columns) {
+ IndexSegment indexSegment = mock(IndexSegment.class);
+ when(indexSegment.getColumnNames()).thenReturn(new HashSet<>(Arrays.asList(columns)));
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
+ when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ return indexSegment;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org