You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/03/22 19:04:56 UTC

[pinot] branch master updated: Support NOT in TimeSegmentPruner (#8381)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bc96b13  Support NOT in TimeSegmentPruner (#8381)
bc96b13 is described below

commit bc96b13333a5d7da65b3f6b9f05d355a537e9647
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Mar 22 12:04:37 2022 -0700

    Support NOT in TimeSegmentPruner (#8381)
---
 .../routing/segmentpruner/TimeSegmentPruner.java   | 31 +++++++++++
 .../routing/segmentpruner/SegmentPrunerTest.java   | 60 +++++++++++++++++-----
 2 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index e84dabc..0b1984e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -200,6 +200,10 @@ public class TimeSegmentPruner implements SegmentPruner {
    *         < 50 OR firstName = Jason')
    *         Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND
    *         time > 100')
+   *         Sorted time intervals without overlapping if time condition is valid
+   *
+   * TODO: 1. Merge adjacent intervals
+   *       2. Set interval boundary using time granularity instead of millis
    */
   @Nullable
   private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
@@ -233,6 +237,14 @@ public class TimeSegmentPruner implements SegmentPruner {
           }
         }
         return getUnionSortedIntervals(orIntervals);
+      case NOT:
+        assert operands.size() == 1;
+        List<Interval> childIntervals = getFilterTimeIntervals(operands.get(0));
+        if (childIntervals == null) {
+          return null;
+        } else {
+          return getComplementSortedIntervals(childIntervals);
+        }
       case EQUALS: {
         Identifier identifier = operands.get(0).getIdentifier();
         if (identifier != null && identifier.getName().equals(_timeColumn)) {
@@ -479,6 +491,25 @@ public class TimeSegmentPruner implements SegmentPruner {
   }
 
   /**
+   * Returns the complement (non-overlapping sorted intervals) of the given non-overlapping sorted intervals.
+   */
+  private List<Interval> getComplementSortedIntervals(List<Interval> intervals) {
+    List<Interval> res = new ArrayList<>();
+    long startTime = MIN_START_TIME;
+    for (Interval interval : intervals) {
+      if (interval._min > startTime) {
+        res.add(new Interval(startTime, interval._min - 1));
+      }
+      if (interval._max == MAX_END_TIME) {
+        return res;
+      }
+      startTime = interval._max + 1;
+    }
+    res.add(new Interval(startTime, MAX_END_TIME));
+    return res;
+  }
+
+  /**
    * Parse interval to millisecond as [min, max] with both sides included.
    * E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310]
    */
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 9f330e0..ad0f995 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -80,13 +80,13 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0";
   private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)";
 
-  private static final String QUERY_5 = "SELECT * FROM testTable where timeColumn = 40";
-  private static final String QUERY_6 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
-  private static final String QUERY_7 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
-  private static final String QUERY_8 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
-  private static final String QUERY_9 =
+  private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40";
+  private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
+  private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
+  private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
+  private static final String TIME_QUERY_5 =
       "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)";
-  private static final String QUERY_10 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
+  private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
 
   private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131";
   private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331";
@@ -97,6 +97,9 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String SDF_QUERY_5 =
       "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
 
+  private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30";
+  private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30";
+
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
@@ -234,8 +237,8 @@ public class SegmentPrunerTest extends ControllerTest {
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
-    when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
-        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
+    when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
+        Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
     when(indexingConfig.getStreamConfigs()).thenReturn(
         Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
@@ -332,12 +335,12 @@ public class SegmentPrunerTest extends ControllerTest {
   @Test(dataProvider = "compilerProvider")
   public void testTimeSegmentPruner(QueryCompiler compiler) {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
-    BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5);
-    BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6);
-    BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7);
-    BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8);
-    BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9);
-    BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10);
+    BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(TIME_QUERY_1);
+    BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(TIME_QUERY_2);
+    BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(TIME_QUERY_3);
+    BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(TIME_QUERY_4);
+    BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(TIME_QUERY_5);
+    BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(TIME_QUERY_6);
     // NOTE: Ideal state and external view are not used in the current implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
@@ -516,6 +519,35 @@ public class SegmentPrunerTest extends ControllerTest {
     assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
   }
 
+  @Test
+  public void testTimeSegmentPrunerSql() {
+    CalciteSqlCompiler compiler = new CalciteSqlCompiler();
+    BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
+    BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+    // NOTE: Ideal state and external view are not used in the current implementation
+    IdealState idealState = Mockito.mock(IdealState.class);
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+
+    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
+    setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
+
+    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore);
+    Set<String> onlineSegments = new HashSet<>();
+    String segment0 = "segment0";
+    onlineSegments.add(segment0);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS);
+    String segment1 = "segment1";
+    onlineSegments.add(segment1);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS);
+    String segment2 = "segment2";
+    onlineSegments.add(segment2);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS);
+    segmentPruner.init(idealState, externalView, onlineSegments);
+
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1)));
+  }
+
   @Test(dataProvider = "compilerProvider")
   public void testEmptySegmentPruner(QueryCompiler compiler) {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org