You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/22 19:04:56 UTC
[pinot] branch master updated: Support NOT in TimeSegmentPruner (#8381)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bc96b13 Support NOT in TimeSegmentPruner (#8381)
bc96b13 is described below
commit bc96b13333a5d7da65b3f6b9f05d355a537e9647
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Mar 22 12:04:37 2022 -0700
Support NOT in TimeSegmentPruner (#8381)
---
.../routing/segmentpruner/TimeSegmentPruner.java | 31 +++++++++++
.../routing/segmentpruner/SegmentPrunerTest.java | 60 +++++++++++++++++-----
2 files changed, 77 insertions(+), 14 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index e84dabc..0b1984e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -200,6 +200,10 @@ public class TimeSegmentPruner implements SegmentPruner {
* < 50 OR firstName = Jason')
* Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND
* time > 100')
+ * Sorted time intervals without overlapping if time condition is valid
+ *
+ * TODO: 1. Merge adjacent intervals
+ * 2. Set interval boundary using time granularity instead of millis
*/
@Nullable
private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
@@ -233,6 +237,14 @@ public class TimeSegmentPruner implements SegmentPruner {
}
}
return getUnionSortedIntervals(orIntervals);
+ case NOT:
+ assert operands.size() == 1;
+ List<Interval> childIntervals = getFilterTimeIntervals(operands.get(0));
+ if (childIntervals == null) {
+ return null;
+ } else {
+ return getComplementSortedIntervals(childIntervals);
+ }
case EQUALS: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_timeColumn)) {
@@ -479,6 +491,25 @@ public class TimeSegmentPruner implements SegmentPruner {
}
/**
+ * Returns the complement (non-overlapping sorted intervals) of the given non-overlapping sorted intervals.
+ */
+ private List<Interval> getComplementSortedIntervals(List<Interval> intervals) {
+ List<Interval> res = new ArrayList<>();
+ long startTime = MIN_START_TIME;
+ for (Interval interval : intervals) {
+ if (interval._min > startTime) {
+ res.add(new Interval(startTime, interval._min - 1));
+ }
+ if (interval._max == MAX_END_TIME) {
+ return res;
+ }
+ startTime = interval._max + 1;
+ }
+ res.add(new Interval(startTime, MAX_END_TIME));
+ return res;
+ }
+
+ /**
* Parse interval to millisecond as [min, max] with both sides included.
* E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310]
*/
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 9f330e0..ad0f995 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -80,13 +80,13 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0";
private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)";
- private static final String QUERY_5 = "SELECT * FROM testTable where timeColumn = 40";
- private static final String QUERY_6 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
- private static final String QUERY_7 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
- private static final String QUERY_8 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
- private static final String QUERY_9 =
+ private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40";
+ private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
+ private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
+ private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
+ private static final String TIME_QUERY_5 =
"SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)";
- private static final String QUERY_10 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
+ private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131";
private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331";
@@ -97,6 +97,9 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_QUERY_5 =
"SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
+ private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30";
+ private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30";
+
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
@@ -234,8 +237,8 @@ public class SegmentPrunerTest extends ControllerTest {
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
- when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
- Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
+ when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
+ Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
when(indexingConfig.getStreamConfigs()).thenReturn(
Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
@@ -332,12 +335,12 @@ public class SegmentPrunerTest extends ControllerTest {
@Test(dataProvider = "compilerProvider")
public void testTimeSegmentPruner(QueryCompiler compiler) {
BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
- BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5);
- BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6);
- BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7);
- BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8);
- BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9);
- BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10);
+ BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(TIME_QUERY_1);
+ BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(TIME_QUERY_2);
+ BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(TIME_QUERY_3);
+ BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(TIME_QUERY_4);
+ BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(TIME_QUERY_5);
+ BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(TIME_QUERY_6);
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
@@ -516,6 +519,35 @@ public class SegmentPrunerTest extends ControllerTest {
assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
}
+ @Test
+ public void testTimeSegmentPrunerSql() {
+ CalciteSqlCompiler compiler = new CalciteSqlCompiler();
+ BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
+ BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+ // NOTE: Ideal state and external view are not used in the current implementation
+ IdealState idealState = Mockito.mock(IdealState.class);
+ ExternalView externalView = Mockito.mock(ExternalView.class);
+
+ TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
+ setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
+
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore);
+ Set<String> onlineSegments = new HashSet<>();
+ String segment0 = "segment0";
+ onlineSegments.add(segment0);
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS);
+ String segment1 = "segment1";
+ onlineSegments.add(segment1);
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS);
+ String segment2 = "segment2";
+ onlineSegments.add(segment2);
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS);
+ segmentPruner.init(idealState, externalView, onlineSegments);
+
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1)));
+ }
+
@Test(dataProvider = "compilerProvider")
public void testEmptySegmentPruner(QueryCompiler compiler) {
BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org