You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/05/02 04:10:49 UTC
[druid] branch 26.0.0 updated: TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 26.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/26.0.0 by this push:
new c26cea2e47 TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)
c26cea2e47 is described below
commit c26cea2e4748d897aed1485eede54d2f5c157298
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon May 1 21:10:42 2023 -0700
TimeBoundary: Use cursor when datasource is not a regular table. (#14151) (#14191)
* TimeBoundary: Use cursor when datasource is not a regular table.
Fixes a bug where TimeBoundary could return incorrect results with
INNER Join or inline data.
* Addl Javadocs.
Co-authored-by: Gian Merlino <gi...@gmail.com>
---
.../TimeBoundaryQueryRunnerFactory.java | 85 +++++++++++++++-------
.../org/apache/druid/segment/StorageAdapter.java | 22 ++++++
.../timeboundary/TimeBoundaryQueryRunnerTest.java | 56 ++++++++++++--
.../sql/calcite/CalciteTimeBoundaryQueryTest.java | 47 ++++++++++++
4 files changed, 176 insertions(+), 34 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index 71a1938eda..7a695987fb 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -37,6 +37,7 @@ import org.apache.druid.query.QueryRunnerHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.Cursor;
@@ -45,6 +46,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -52,6 +54,7 @@ import java.util.Iterator;
import java.util.List;
/**
+ *
*/
public class TimeBoundaryQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
@@ -142,7 +145,7 @@ public class TimeBoundaryQueryRunnerFactory
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);
}
- final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input;
+ final TimeBoundaryQuery query = (TimeBoundaryQuery) input;
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>()
@@ -155,26 +158,31 @@ public class TimeBoundaryQueryRunnerFactory
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
- final DateTime minTime;
- final DateTime maxTime;
-
- if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
- minTime = getTimeBoundary(adapter, legacyQuery, false);
- if (minTime == null) {
- maxTime = null;
- } else {
- maxTime = getTimeBoundary(adapter, legacyQuery, true);
+
+ DateTime minTime = null;
+ DateTime maxTime = null;
+
+ if (canUseAdapterMinMaxTime(query, adapter)) {
+ if (!query.isMaxTime()) {
+ minTime = adapter.getMinTime();
+ }
+
+ if (!query.isMinTime()) {
+ maxTime = adapter.getMaxTime();
}
} else {
- minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
- ? null
- : adapter.getMinTime();
- maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
- ? null
- : adapter.getMaxTime();
+ if (!query.isMaxTime()) {
+ minTime = getTimeBoundary(adapter, query, false);
+ }
+
+ if (!query.isMinTime()) {
+ if (query.isMaxTime() || minTime != null) {
+ maxTime = getTimeBoundary(adapter, query, true);
+ }
+ }
}
- return legacyQuery.buildResult(
+ return query.buildResult(
adapter.getInterval().getStart(),
minTime,
maxTime
@@ -186,17 +194,42 @@ public class TimeBoundaryQueryRunnerFactory
{
}
-
- private boolean queryIntervalContainsAdapterInterval()
- {
- List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
- if (queryIntervals.size() != 1) {
- throw new IAE("Should only have one interval, got[%s]", queryIntervals);
- }
- return queryIntervals.get(0).contains(adapter.getInterval());
- }
}
);
}
}
+
+ /**
+ * Whether a particular {@link TimeBoundaryQuery} can use {@link StorageAdapter#getMinTime()} and/or
+ * {@link StorageAdapter#getMaxTime()}. If false, must use {@link StorageAdapter#makeCursors}.
+ */
+ private static boolean canUseAdapterMinMaxTime(final TimeBoundaryQuery query, final StorageAdapter adapter)
+ {
+ if (query.getFilter() != null) {
+ // We have to check which rows actually match the filter.
+ return false;
+ }
+
+ if (!(query.getDataSource() instanceof TableDataSource)) {
+ // In general, minTime / maxTime are only guaranteed to match data for regular tables.
+ //
+ // One example: an INNER JOIN can act as a filter and remove some rows. Another example: RowBasedStorageAdapter
+ // (used by e.g. inline data) uses nominal interval, not actual data, for minTime / maxTime.
+ return false;
+ }
+
+ final Interval queryInterval = CollectionUtils.getOnlyElement(
+ query.getQuerySegmentSpec().getIntervals(),
+ xs -> new IAE("Should only have one interval, got[%s]", xs)
+ );
+
+ if (!queryInterval.contains(adapter.getInterval())) {
+ // Query interval does not contain adapter interval. Need to create a cursor to see the first
+ // timestamp within the query interval.
+ return false;
+ }
+
+ // Passed all checks.
+ return true;
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
index ce23ae7d1c..2d3fc6a50b 100644
--- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java
@@ -66,7 +66,29 @@ public interface StorageAdapter extends CursorFactory, ColumnInspector
* (or null) values.
*/
int getDimensionCardinality(String column);
+
+ /**
+ * Metadata-only operation that returns a lower bound on
+ * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be earlier than
+ * the actual minimum data timestamp.
+ *
+ * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
+ * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
+ * contract is tighter: it does return the actual minimum data timestamp. This fact is leveraged by
+ * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
+ */
DateTime getMinTime();
+
+ /**
+ * Metadata-only operation that returns an upper bound on
+ * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be later than
+ * the actual maximum data timestamp.
+ *
+ * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
+ * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
+ * contract is tighter: it does return the actual maximum data timestamp. This fact is leveraged by
+ * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
+ */
DateTime getMaxTime();
/**
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index 4b46bbcf87..02ba0ac44f 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -28,7 +28,9 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@@ -41,11 +43,14 @@ import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -64,9 +69,10 @@ import java.util.Arrays;
import java.util.List;
/**
+ *
*/
@RunWith(Parameterized.class)
-public class TimeBoundaryQueryRunnerTest
+public class TimeBoundaryQueryRunnerTest extends InitializedNullHandlingTest
{
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
@@ -98,7 +104,7 @@ public class TimeBoundaryQueryRunnerTest
"2011-01-12T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t100.000000",
"2011-01-13T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t100.000000",
"2011-01-13T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t100.000000",
- };
+ };
public static final String[] V_0113 = {
"2011-01-14T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t94.874713",
"2011-01-14T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
@@ -109,7 +115,7 @@ public class TimeBoundaryQueryRunnerTest
"2011-01-16T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
"2011-01-17T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t103.629399",
"2011-01-17T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
- };
+ };
private static IncrementalIndex newIndex(String minTimeStamp)
{
@@ -150,7 +156,8 @@ public class TimeBoundaryQueryRunnerTest
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
- VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
+ VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(
+ StringComparators.LEXICOGRAPHIC);
timeline.add(
index0.getInterval(),
"v1",
@@ -197,7 +204,8 @@ public class TimeBoundaryQueryRunnerTest
.dataSource("testing")
.intervals(
new MultipleIntervalSegmentSpec(
- ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
+ ImmutableList.of(Intervals.of(
+ "2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
)
)
.build();
@@ -247,14 +255,46 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}
+ @Test
+ public void testTimeBoundaryInlineData()
+ {
+ final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
+ ImmutableList.of(new Object[]{DateTimes.of("2000-01-02").getMillis()}),
+ RowSignature.builder().addTimeColumn().build()
+ );
+
+ TimeBoundaryQuery timeBoundaryQuery =
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(inlineDataSource)
+ .build();
+
+ Assert.assertFalse(timeBoundaryQuery.hasFilters());
+ final QueryRunner<Result<TimeBoundaryResultValue>> theRunner =
+ new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER).createRunner(
+ new RowBasedSegment<>(
+ SegmentId.dummy("dummy"),
+ Sequences.simple(inlineDataSource.getRows()),
+ inlineDataSource.rowAdapter(),
+ inlineDataSource.getRowSignature()
+ )
+ );
+ Iterable<Result<TimeBoundaryResultValue>> results = theRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
+ TimeBoundaryResultValue val = results.iterator().next().getValue();
+ DateTime minTime = val.getMinTime();
+ DateTime maxTime = val.getMaxTime();
+
+ Assert.assertEquals(DateTimes.of("2000-01-02"), minTime);
+ Assert.assertEquals(DateTimes.of("2000-01-02"), maxTime);
+ }
+
@Test(expected = UOE.class)
@SuppressWarnings("unchecked")
public void testTimeBoundaryArrayResults()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
- .dataSource("testing")
- .bound(null)
- .build();
+ .dataSource("testing")
+ .bound(null)
+ .build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
index bb33b71411..c4cd002fbe 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
@@ -24,11 +24,15 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import java.util.HashMap;
@@ -131,4 +135,47 @@ public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
})
);
}
+
+ @Test
+ public void testMaxTimeQueryWithJoin()
+ {
+ // Cannot vectorize due to JOIN.
+ cannotVectorize();
+
+ HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+ context.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);
+
+ testBuilder()
+ .sql("SELECT MAX(t1.__time)\n"
+ + "FROM foo t1\n"
+ + "INNER JOIN foo t2 ON CAST(t1.m1 AS BIGINT) = t2.cnt\n")
+ .queryContext(context)
+ .expectedQueries(
+ ImmutableList.of(
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(
+ join(
+ new TableDataSource(CalciteTests.DATASOURCE1),
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("cnt")
+ .context(context)
+ .build()
+ ),
+ "j0.",
+ equalsCondition(makeExpression("CAST(\"m1\", 'LONG')"), makeColumnExpression("j0.cnt")),
+ JoinType.INNER
+ )
+
+ )
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(context)
+ .build()
+ )
+ )
+ .expectedResults(ImmutableList.of(new Object[]{946684800000L}))
+ .run();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org