You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/05/02 04:10:49 UTC

[druid] branch 26.0.0 updated: TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 26.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/26.0.0 by this push:
     new c26cea2e47 TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)
c26cea2e47 is described below

commit c26cea2e4748d897aed1485eede54d2f5c157298
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon May 1 21:10:42 2023 -0700

    TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)
    
    * TimeBoundary: Use cursor when datasource is not a regular table.
    
    Fixes a bug where TimeBoundary could return incorrect results with
    INNER Join or inline data.
    
    * Addl Javadocs.
    
    Co-authored-by: Gian Merlino <gi...@gmail.com>
---
 .../TimeBoundaryQueryRunnerFactory.java            | 85 +++++++++++++++-------
 .../org/apache/druid/segment/StorageAdapter.java   | 22 ++++++
 .../timeboundary/TimeBoundaryQueryRunnerTest.java  | 56 ++++++++++++--
 .../sql/calcite/CalciteTimeBoundaryQueryTest.java  | 47 ++++++++++++
 4 files changed, 176 insertions(+), 34 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index 71a1938eda..7a695987fb 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.QueryRunnerHelper;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.query.Result;
+import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.Cursor;
@@ -45,6 +46,7 @@ import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -52,6 +54,7 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
+ *
  */
 public class TimeBoundaryQueryRunnerFactory
     implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
@@ -142,7 +145,7 @@ public class TimeBoundaryQueryRunnerFactory
         throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);
       }
 
-      final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input;
+      final TimeBoundaryQuery query = (TimeBoundaryQuery) input;
 
       return new BaseSequence<>(
           new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>()
@@ -155,26 +158,31 @@ public class TimeBoundaryQueryRunnerFactory
                     "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
                 );
               }
-              final DateTime minTime;
-              final DateTime maxTime;
-
-              if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
-                minTime = getTimeBoundary(adapter, legacyQuery, false);
-                if (minTime == null) {
-                  maxTime = null;
-                } else {
-                  maxTime = getTimeBoundary(adapter, legacyQuery, true);
+
+              DateTime minTime = null;
+              DateTime maxTime = null;
+
+              if (canUseAdapterMinMaxTime(query, adapter)) {
+                if (!query.isMaxTime()) {
+                  minTime = adapter.getMinTime();
+                }
+
+                if (!query.isMinTime()) {
+                  maxTime = adapter.getMaxTime();
                 }
               } else {
-                minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
-                          ? null
-                          : adapter.getMinTime();
-                maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
-                          ? null
-                          : adapter.getMaxTime();
+                if (!query.isMaxTime()) {
+                  minTime = getTimeBoundary(adapter, query, false);
+                }
+
+                if (!query.isMinTime()) {
+                  if (query.isMaxTime() || minTime != null) {
+                    maxTime = getTimeBoundary(adapter, query, true);
+                  }
+                }
               }
 
-              return legacyQuery.buildResult(
+              return query.buildResult(
                   adapter.getInterval().getStart(),
                   minTime,
                   maxTime
@@ -186,17 +194,42 @@ public class TimeBoundaryQueryRunnerFactory
             {
 
             }
-
-            private boolean queryIntervalContainsAdapterInterval()
-            {
-              List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
-              if (queryIntervals.size() != 1) {
-                throw new IAE("Should only have one interval, got[%s]", queryIntervals);
-              }
-              return queryIntervals.get(0).contains(adapter.getInterval());
-            }
           }
       );
     }
   }
+
+  /**
+   * Whether a particular {@link TimeBoundaryQuery} can use {@link StorageAdapter#getMinTime()} and/or
+   * {@link StorageAdapter#getMaxTime()}. If false, must use {@link StorageAdapter#makeCursors}.
+   */
+  private static boolean canUseAdapterMinMaxTime(final TimeBoundaryQuery query, final StorageAdapter adapter)
+  {
+    if (query.getFilter() != null) {
+      // We have to check which rows actually match the filter.
+      return false;
+    }
+
+    if (!(query.getDataSource() instanceof TableDataSource)) {
+      // In general, minTime / maxTime are only guaranteed to match data for regular tables.
+      //
+      // One example: an INNER JOIN can act as a filter and remove some rows. Another example: RowBasedStorageAdapter
+      // (used by e.g. inline data) uses nominal interval, not actual data, for minTime / maxTime.
+      return false;
+    }
+
+    final Interval queryInterval = CollectionUtils.getOnlyElement(
+        query.getQuerySegmentSpec().getIntervals(),
+        xs -> new IAE("Should only have one interval, got[%s]", xs)
+    );
+
+    if (!queryInterval.contains(adapter.getInterval())) {
+      // Query interval does not contain adapter interval. Need to create a cursor to see the first
+      // timestamp within the query interval.
+      return false;
+    }
+
+    // Passed all checks.
+    return true;
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
index ce23ae7d1c..2d3fc6a50b 100644
--- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
@@ -66,7 +66,29 @@ public interface StorageAdapter extends CursorFactory, ColumnInspector
    * (or null) values.
    */
   int getDimensionCardinality(String column);
+
+  /**
+   * Metadata-only operation that returns a lower bound on
+   * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be earlier than
+   * the actual minimum data timestamp.
+   *
+   * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
+   * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
+   * contract is tighter: it does return the actual minimum data timestamp. This fact is leveraged by
+   * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
+   */
   DateTime getMinTime();
+
+  /**
+   * Metadata-only operation that returns an upper bound on
+   * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be later than
+   * the actual maximum data timestamp.
+   *
+   * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
+   * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
+   * contract is tighter: it does return the actual maximum data timestamp. This fact is leveraged by
+   * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
+   */
   DateTime getMaxTime();
 
   /**
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index 4b46bbcf87..02ba0ac44f 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -28,7 +28,9 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
@@ -41,11 +43,14 @@ import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.RowBasedSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -64,9 +69,10 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
+ *
  */
 @RunWith(Parameterized.class)
-public class TimeBoundaryQueryRunnerTest
+public class TimeBoundaryQueryRunnerTest extends InitializedNullHandlingTest
 {
   @Parameterized.Parameters(name = "{0}")
   public static Iterable<Object[]> constructorFeeder()
@@ -98,7 +104,7 @@ public class TimeBoundaryQueryRunnerTest
       "2011-01-12T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t100.000000",
       "2011-01-13T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t100.000000",
       "2011-01-13T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t100.000000",
-  };
+      };
   public static final String[] V_0113 = {
       "2011-01-14T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t94.874713",
       "2011-01-14T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
@@ -109,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest
       "2011-01-16T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
       "2011-01-17T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t103.629399",
       "2011-01-17T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
-  };
+      };
 
   private static IncrementalIndex newIndex(String minTimeStamp)
   {
@@ -150,7 +156,8 @@ public class TimeBoundaryQueryRunnerTest
     segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
     segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
 
-    VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
+    VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(
+        StringComparators.LEXICOGRAPHIC);
     timeline.add(
         index0.getInterval(),
         "v1",
@@ -197,7 +204,8 @@ public class TimeBoundaryQueryRunnerTest
                                                 .dataSource("testing")
                                                 .intervals(
                                                     new MultipleIntervalSegmentSpec(
-                                                        ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
+                                                        ImmutableList.of(Intervals.of(
+                                                            "2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
                                                     )
                                                 )
                                                 .build();
@@ -247,14 +255,46 @@ public class TimeBoundaryQueryRunnerTest
     Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
   }
 
+  @Test
+  public void testTimeBoundaryInlineData()
+  {
+    final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
+        ImmutableList.of(new Object[]{DateTimes.of("2000-01-02").getMillis()}),
+        RowSignature.builder().addTimeColumn().build()
+    );
+
+    TimeBoundaryQuery timeBoundaryQuery =
+        Druids.newTimeBoundaryQueryBuilder()
+              .dataSource(inlineDataSource)
+              .build();
+
+    Assert.assertFalse(timeBoundaryQuery.hasFilters());
+    final QueryRunner<Result<TimeBoundaryResultValue>> theRunner =
+        new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER).createRunner(
+            new RowBasedSegment<>(
+                SegmentId.dummy("dummy"),
+                Sequences.simple(inlineDataSource.getRows()),
+                inlineDataSource.rowAdapter(),
+                inlineDataSource.getRowSignature()
+            )
+        );
+    Iterable<Result<TimeBoundaryResultValue>> results = theRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
+    TimeBoundaryResultValue val = results.iterator().next().getValue();
+    DateTime minTime = val.getMinTime();
+    DateTime maxTime = val.getMaxTime();
+
+    Assert.assertEquals(DateTimes.of("2000-01-02"), minTime);
+    Assert.assertEquals(DateTimes.of("2000-01-02"), maxTime);
+  }
+
   @Test(expected = UOE.class)
   @SuppressWarnings("unchecked")
   public void testTimeBoundaryArrayResults()
   {
     TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
-                                                   .dataSource("testing")
-                                                   .bound(null)
-                                                   .build();
+                                                .dataSource("testing")
+                                                .bound(null)
+                                                .build();
     ResponseContext context = ConcurrentResponseContext.createEmpty();
     context.initializeMissingSegments();
     new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
index bb33b71411..c4cd002fbe 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
@@ -24,11 +24,15 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -131,4 +135,47 @@ public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
         })
     );
   }
+
+  @Test
+  public void testMaxTimeQueryWithJoin()
+  {
+    // Cannot vectorize due to JOIN.
+    cannotVectorize();
+
+    HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+    context.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
+
+    testBuilder()
+        .sql("SELECT MAX(t1.__time)\n"
+             + "FROM foo t1\n"
+             + "INNER JOIN foo t2 ON CAST(t1.m1 AS BIGINT) = t2.cnt\n")
+        .queryContext(context)
+        .expectedQueries(
+            ImmutableList.of(
+                Druids.newTimeBoundaryQueryBuilder()
+                      .dataSource(
+                          join(
+                              new TableDataSource(CalciteTests.DATASOURCE1),
+                              new QueryDataSource(
+                                  newScanQueryBuilder()
+                                      .dataSource(CalciteTests.DATASOURCE1)
+                                      .intervals(querySegmentSpec(Filtration.eternity()))
+                                      .columns("cnt")
+                                      .context(context)
+                                      .build()
+                              ),
+                              "j0.",
+                              equalsCondition(makeExpression("CAST(\"m1\", 'LONG')"), makeColumnExpression("j0.cnt")),
+                              JoinType.INNER
+                          )
+
+                      )
+                      .bound(TimeBoundaryQuery.MAX_TIME)
+                      .context(context)
+                      .build()
+            )
+        )
+        .expectedResults(ImmutableList.of(new Object[]{946684800000L}))
+        .run();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org