You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2022/04/25 15:19:11 UTC
[druid] branch master updated: Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 95694b5afa Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)
95694b5afa is described below
commit 95694b5afa505aea906f05db41c8901559b8bd2b
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Apr 25 20:48:58 2022 +0530
Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)
* Support array based results in timeBoundary query
* Fix bug with query interval in timeBoundary
* Convert min(__time) and max(__time) SQL queries to timeBoundary
* Add tests for timeBoundary backed SQL queries
* Fix query plans for existing tests
* fixup! Convert min(__time) and max(__time) SQL queries to timeBoundary
* fixup! Add tests for timeBoundary backed SQL queries
* fixup! Fix bug with query interval in timeBoundary
---
.../query/timeboundary/TimeBoundaryQuery.java | 2 +
.../TimeBoundaryQueryQueryToolChest.java | 38 +++++++
.../TimeBoundaryQueryRunnerFactory.java | 13 ++-
.../TimeBoundaryQueryQueryToolChestTest.java | 40 +++++++
.../timeboundary/TimeBoundaryQueryRunnerTest.java | 87 ++++++++++++++
.../org/apache/druid/server/QueryStackTests.java | 3 +
.../apache/druid/sql/calcite/rel/DruidQuery.java | 73 ++++++++++++
.../druid/sql/calcite/CalciteJoinQueryTest.java | 126 +++++++++++----------
.../sql/calcite/CalciteTimeBoundaryQueryTest.java | 121 ++++++++++++++++++++
9 files changed, 441 insertions(+), 62 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
index 8c328ca29a..240ede642a 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
@@ -48,6 +48,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";
+ public static final String MAX_TIME_ARRAY_OUTPUT_NAME = "maxTimeArrayOutputName";
+ public static final String MIN_TIME_ARRAY_OUTPUT_NAME = "minTimeArrayOutputName";
private static final byte CACHE_TYPE_ID = 0x0;
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index d6a402fcc8..9dc0859f4e 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -42,6 +42,8 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;
import java.nio.ByteBuffer;
@@ -224,4 +226,40 @@ public class TimeBoundaryQueryQueryToolChest
}
};
}
+
+ @Override
+ public RowSignature resultArraySignature(TimeBoundaryQuery query)
+ {
+ if (query.isMinTime() || query.isMaxTime()) {
+ RowSignature.Builder builder = RowSignature.builder();
+ String outputName = query.isMinTime() ?
+ query.getContextValue(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME) :
+ query.getContextValue(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MAX_TIME);
+ return builder.add(outputName, ColumnType.LONG).build();
+ }
+ return super.resultArraySignature(query);
+ }
+
+ @Override
+ public Sequence<Object[]> resultsAsArrays(
+ TimeBoundaryQuery query,
+ Sequence<Result<TimeBoundaryResultValue>> resultSequence
+ )
+ {
+ if (query.isMaxTime()) {
+ return Sequences.map(
+ resultSequence,
+ result -> result == null || result.getValue() == null || result.getValue().getMaxTime() == null ? null :
+ new Object[]{result.getValue().getMaxTime().getMillis()}
+ );
+ } else if (query.isMinTime()) {
+ return Sequences.map(
+ resultSequence,
+ result -> result == null || result.getValue() == null || result.getValue().getMinTime() == null ? null :
+ new Object[]{result.getValue().getMinTime().getMillis()}
+ );
+ } else {
+ return super.resultsAsArrays(query, resultSequence);
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index 07775d4528..2fe21dafbf 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.timeboundary;
import com.google.common.base.Function;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
@@ -45,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.DateTime;
+import org.joda.time.Interval;
import java.util.Iterator;
import java.util.List;
@@ -155,7 +157,7 @@ public class TimeBoundaryQueryRunnerFactory
final DateTime minTime;
final DateTime maxTime;
- if (legacyQuery.getFilter() != null) {
+ if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;
@@ -183,6 +185,15 @@ public class TimeBoundaryQueryRunnerFactory
{
}
+
+ private boolean queryIntervalContainsAdapterInterval()
+ {
+ List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
+ if (queryIntervals.size() != 1) {
+ throw new IAE("Should only have one interval, got[%s]", queryIntervals);
+ }
+ return queryIntervals.get(0).contains(adapter.getInterval());
+ }
}
);
}
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
index 6ab886fe08..6a96f01360 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
@@ -25,11 +25,14 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -289,6 +292,43 @@ public class TimeBoundaryQueryQueryToolChestTest
Assert.assertEquals(7, segments.size());
}
+ @Test(expected = UOE.class)
+ public void testResultArraySignature()
+ {
+ TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .build();
+ new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+ }
+
+ @Test
+ public void testResultArraySignatureWithMinTime()
+ {
+ TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .context(ImmutableMap.of(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "foo"))
+ .build();
+ RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+ RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
+ expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
+ Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
+ }
+
+ @Test
+ public void testResultArraySignatureWithMaxTime()
+ {
+ TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(ImmutableMap.of(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "foo"))
+ .build();
+ RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+ RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
+ expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
+ Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
+ }
+
@Test
public void testCacheStrategy() throws Exception
{
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index e1ceab34de..4b46bbcf87 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -19,11 +19,14 @@
package org.apache.druid.query.timeboundary;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.io.CharSource;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
@@ -35,6 +38,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
@@ -47,6 +51,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@@ -183,6 +188,32 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTimeFilteredTimeBoundaryQuery() throws IOException
+ {
+ QueryRunner customRunner = getCustomRunner();
+ TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
+ )
+ )
+ .build();
+ List<Result<TimeBoundaryResultValue>> results =
+ customRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
+
+ Assert.assertTrue(Iterables.size(results) > 0);
+
+ TimeBoundaryResultValue val = results.iterator().next().getValue();
+ DateTime minTime = val.getMinTime();
+ DateTime maxTime = val.getMaxTime();
+
+ Assert.assertEquals(DateTimes.of("2011-01-15T00:00:00.000Z"), minTime);
+ Assert.assertEquals(DateTimes.of("2011-01-15T01:00:00.000Z"), maxTime);
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testFilteredTimeBoundaryQueryNoMatches() throws IOException
@@ -216,6 +247,22 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}
+ @Test(expected = UOE.class)
+ @SuppressWarnings("unchecked")
+ public void testTimeBoundaryArrayResults()
+ {
+ TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .bound(null)
+ .build();
+ ResponseContext context = ConcurrentResponseContext.createEmpty();
+ context.initializeMissingSegments();
+ new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+ timeBoundaryQuery,
+ runner.run(QueryPlus.wrap(timeBoundaryQuery), context)
+ ).toList();
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMax()
@@ -235,6 +282,26 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTimeBoundaryMaxArraysResults()
+ {
+ TimeBoundaryQuery maxTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .build();
+ ResponseContext context = ConcurrentResponseContext.createEmpty();
+ context.initializeMissingSegments();
+ List<Object[]> maxTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+ maxTimeBoundaryQuery,
+ runner.run(QueryPlus.wrap(maxTimeBoundaryQuery), context)
+ ).toList();
+
+ Long maxTimeMillis = (Long) maxTime.get(0)[0];
+ Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), new DateTime(maxTimeMillis, DateTimeZone.UTC));
+ Assert.assertEquals(1, maxTime.size());
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testTimeBoundaryMin()
@@ -254,6 +321,26 @@ public class TimeBoundaryQueryRunnerTest
Assert.assertNull(maxTime);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTimeBoundaryMinArraysResults()
+ {
+ TimeBoundaryQuery minTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("testing")
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .build();
+ ResponseContext context = ConcurrentResponseContext.createEmpty();
+ context.initializeMissingSegments();
+ List<Object[]> minTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+ minTimeBoundaryQuery,
+ runner.run(QueryPlus.wrap(minTimeBoundaryQuery), context)
+ ).toList();
+
+ Long minTimeMillis = (Long) minTime.get(0)[0];
+ Assert.assertEquals(DateTimes.of("2011-01-12T00:00:00.000Z"), new DateTime(minTimeMillis, DateTimeZone.UTC));
+ Assert.assertEquals(1, minTime.size());
+ }
+
@Test
public void testMergeResults()
{
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 9e7c234c0c..0dfe0d5362 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -55,6 +55,8 @@ import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -344,6 +346,7 @@ public class QueryStackTests
)
)
.put(GroupByQuery.class, groupByQueryRunnerFactory)
+ .put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER))
.build()
);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 98f2dfe045..f9f5083f17 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -49,7 +49,11 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
@@ -59,6 +63,7 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.InvertedTopNMetricSpec;
@@ -799,6 +804,11 @@ public class DruidQuery
}
}
+ final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
+ if (timeBoundaryQuery != null) {
+ return timeBoundaryQuery;
+ }
+
final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector);
if (tsQuery != null) {
return tsQuery;
@@ -822,6 +832,69 @@ public class DruidQuery
throw new CannotBuildQueryException("Cannot convert query parts into an actual query");
}
+ /**
+ * Return this query as a TimeBoundary query, or null if this query is not compatible with Timeseries.
+ *
+ * @return a TimeBoundaryQuery if possible. null if it is not possible to construct one.
+ */
+ @Nullable
+ private TimeBoundaryQuery toTimeBoundaryQuery()
+ {
+ if (grouping == null
+ || grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
+ || grouping.getHavingFilter() != null
+ || selectProjection != null) {
+ return null;
+ }
+
+ if (sorting != null && sorting.getOffsetLimit().hasOffset()) {
+ // Timeboundary cannot handle offsets.
+ return null;
+ }
+
+ if (grouping.getDimensions().isEmpty() &&
+ grouping.getPostAggregators().isEmpty() &&
+ grouping.getAggregatorFactories().size() == 1) { // currently only handles max(__time) or min(__time) not both
+ boolean minTime;
+ AggregatorFactory aggregatorFactory = Iterables.getOnlyElement(grouping.getAggregatorFactories());
+ if (aggregatorFactory instanceof LongMaxAggregatorFactory ||
+ aggregatorFactory instanceof LongMinAggregatorFactory) {
+ SimpleLongAggregatorFactory minMaxFactory = (SimpleLongAggregatorFactory) aggregatorFactory;
+ String fieldName = minMaxFactory.getFieldName();
+ if (fieldName == null ||
+ !fieldName.equals(ColumnHolder.TIME_COLUMN_NAME) ||
+ (minMaxFactory.getExpression() != null && !minMaxFactory.getExpression().isEmpty())) {
+ return null;
+ }
+ minTime = aggregatorFactory instanceof LongMinAggregatorFactory;
+ } else {
+ return null;
+ }
+ final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration(
+ dataSource,
+ filter,
+ virtualColumnRegistry
+ );
+ final DataSource newDataSource = dataSourceFiltrationPair.lhs;
+ final Filtration filtration = dataSourceFiltrationPair.rhs;
+ String bound = minTime ? TimeBoundaryQuery.MIN_TIME : TimeBoundaryQuery.MAX_TIME;
+ HashMap<String, Object> context = new HashMap<>(plannerContext.getQueryContext().getMergedParams());
+ if (minTime) {
+ context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
+ } else {
+ context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
+ }
+ return new TimeBoundaryQuery(
+ newDataSource,
+ filtration.getQuerySegmentSpec(),
+ bound,
+ filtration.getDimFilter(),
+ context
+ );
+ }
+ return null;
+ }
+
/**
* Return this query as a Timeseries query, or null if this query is not compatible with Timeseries.
*
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index 2d5b3b043d..1c4227102f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -72,6 +72,7 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.InvertedTopNMetricSpec;
import org.apache.druid.query.topn.NumericTopNMetricSpec;
@@ -2387,6 +2388,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
cannotVectorize();
}
+ Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+ maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)",
queryContext,
@@ -2396,14 +2399,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
- .withOverriddenContext(queryContext)
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(maxTimeQueryContext)
+ .build()
),
"j0.",
equalsCondition(
@@ -2433,6 +2434,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
// Cannot vectorize JOIN operator.
cannotVectorize();
+ Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+ maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)",
queryContext,
@@ -2446,13 +2449,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
GroupByQuery
.builder()
.setDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(maxTimeQueryContext)
+ .build()
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
@@ -3566,6 +3568,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
cannotVectorize();
}
+ Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+ maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT dim1, COUNT(*) FROM foo\n"
+ "WHERE dim1 IN ('abc', 'def')"
@@ -3580,28 +3584,26 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .filters(selector("cnt", "1", null))
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .filters(selector("cnt", "1", null))
+ .context(maxTimeQueryContext)
+ .build()
),
"j0.",
"(\"__time\" == \"j0.a0\")",
JoinType.INNER
),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .filters(not(selector("cnt", "2", null)))
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .filters(not(selector("cnt", "2", null)))
+ .context(maxTimeQueryContext)
+ .build()
),
"_j0.",
"(\"__time\" == \"_j0.a0\")",
@@ -3626,6 +3628,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
{
cannotVectorize();
+ Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
+ minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+ Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+ maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT dim1, COUNT(*) FROM foo\n"
+ "WHERE dim1 IN ('abc', 'def')\n"
@@ -3641,13 +3647,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(maxTimeQueryContext)
+ .build()
),
"j0.",
"(\"__time\" == \"j0.a0\")",
@@ -3657,15 +3662,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(
- new LongMinAggregatorFactory("a0", "__time")
- )
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .context(minTimeQueryContext)
+ .build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@@ -3730,6 +3732,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
{
cannotVectorize();
+ Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
+ minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+ Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+ maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
testQuery(
"SELECT dim1, COUNT(*) FROM\n"
+ "foo\n"
@@ -3745,26 +3751,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(maxTimeQueryContext)
+ .build()
),
"j0.",
"(\"__time\" == \"j0.a0\")",
JoinType.INNER
),
new QueryDataSource(
- Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.DATASOURCE1)
- .intervals(querySegmentSpec(Filtration.eternity()))
- .granularity(Granularities.ALL)
- .aggregators(new LongMinAggregatorFactory("a0", "__time"))
- .context(QUERY_CONTEXT_DEFAULT)
- .build()
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .context(minTimeQueryContext)
+ .build()
),
"_j0.",
"(\"__time\" == \"_j0.a0\")",
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
new file mode 100644
index 0000000000..74e42c5365
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
+{
+ // __time for foo is [2000-01-01, 2000-01-02, 2000-01-03, 2001-01-01, 2001-01-02, 2001-01-03]
+ @Test
+ public void testMaxTimeQuery() throws Exception
+ {
+ HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+ context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
+ testQuery(
+ "SELECT MAX(__time) AS maxTime FROM foo",
+ ImmutableList.of(
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("foo")
+ .bound(TimeBoundaryQuery.MAX_TIME)
+ .context(context)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{DateTimes.of("2001-01-03").getMillis()})
+ );
+ }
+
+ @Test
+ public void testMinTimeQuery() throws Exception
+ {
+ HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+ context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+ testQuery(
+ "SELECT MIN(__time) AS minTime FROM foo",
+ ImmutableList.of(
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("foo")
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .context(context)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{DateTimes.of("2000-01-01").getMillis()})
+ );
+ }
+
+ @Test
+ public void testMinTimeQueryWithFilters() throws Exception
+ {
+ HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+ context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+ testQuery(
+ "SELECT MIN(__time) AS minTime FROM foo where __time >= '2001-01-01' and __time < '2003-01-01'",
+ ImmutableList.of(
+ Druids.newTimeBoundaryQueryBuilder()
+ .dataSource("foo")
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(Intervals.of("2001-01-01T00:00:00.000Z/2003-01-01T00:00:00.000Z"))
+ )
+ )
+ .bound(TimeBoundaryQuery.MIN_TIME)
+ .context(context)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{DateTimes.of("2001-01-01").getMillis()})
+ );
+ }
+
+ // Currently, if both min(__time) and max(__time) are present, we don't convert it
+ // to a timeBoundary query. (ref : https://github.com/apache/druid/issues/12479)
+ @Test
+ public void testMinMaxTimeQuery() throws Exception
+ {
+ testQuery(
+ "SELECT MIN(__time) AS minTime, MAX(__time) as maxTime FROM foo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("foo")
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .aggregators(
+ new LongMinAggregatorFactory("a0", "__time"),
+ new LongMaxAggregatorFactory("a1", "__time")
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{
+ DateTimes.of("2000-01-01").getMillis(),
+ DateTimes.of("2001-01-03").getMillis()
+ })
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org