You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2022/04/25 15:19:11 UTC

[druid] branch master updated: Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 95694b5afa Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)
95694b5afa is described below

commit 95694b5afa505aea906f05db41c8901559b8bd2b
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Mon Apr 25 20:48:58 2022 +0530

    Convert simple min/max SQL queries on __time to timeBoundary queries (#12472)
    
    * Support array based results in timeBoundary query
    
    * Fix bug with query interval in timeBoundary
    
    * Convert min(__time) and max(__time) SQL queries to timeBoundary
    
    * Add tests for timeBoundary backed SQL queries
    
    * Fix query plans for existing tests
    
    * fixup! Convert min(__time) and max(__time) SQL queries to timeBoundary
    
    * fixup! Add tests for timeBoundary backed SQL queries
    
    * fixup! Fix bug with query interval in timeBoundary
---
 .../query/timeboundary/TimeBoundaryQuery.java      |   2 +
 .../TimeBoundaryQueryQueryToolChest.java           |  38 +++++++
 .../TimeBoundaryQueryRunnerFactory.java            |  13 ++-
 .../TimeBoundaryQueryQueryToolChestTest.java       |  40 +++++++
 .../timeboundary/TimeBoundaryQueryRunnerTest.java  |  87 ++++++++++++++
 .../org/apache/druid/server/QueryStackTests.java   |   3 +
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |  73 ++++++++++++
 .../druid/sql/calcite/CalciteJoinQueryTest.java    | 126 +++++++++++----------
 .../sql/calcite/CalciteTimeBoundaryQueryTest.java  | 121 ++++++++++++++++++++
 9 files changed, 441 insertions(+), 62 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
index 8c328ca29a..240ede642a 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQuery.java
@@ -48,6 +48,8 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>
   private static final QuerySegmentSpec DEFAULT_SEGMENT_SPEC = new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY);
   public static final String MAX_TIME = "maxTime";
   public static final String MIN_TIME = "minTime";
+  public static final String MAX_TIME_ARRAY_OUTPUT_NAME = "maxTimeArrayOutputName";
+  public static final String MIN_TIME_ARRAY_OUTPUT_NAME = "minTimeArrayOutputName";
 
   private static final byte CACHE_TYPE_ID = 0x0;
 
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index d6a402fcc8..9dc0859f4e 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -42,6 +42,8 @@ import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.MetricManipulationFn;
 import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.timeline.LogicalSegment;
 
 import java.nio.ByteBuffer;
@@ -224,4 +226,40 @@ public class TimeBoundaryQueryQueryToolChest
       }
     };
   }
+
+  @Override
+  public RowSignature resultArraySignature(TimeBoundaryQuery query)
+  {
+    if (query.isMinTime() || query.isMaxTime()) {
+      RowSignature.Builder builder = RowSignature.builder();
+      String outputName = query.isMinTime() ?
+                          query.getContextValue(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME) :
+                          query.getContextValue(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MAX_TIME);
+      return builder.add(outputName, ColumnType.LONG).build();
+    }
+    return super.resultArraySignature(query);
+  }
+
+  @Override
+  public Sequence<Object[]> resultsAsArrays(
+      TimeBoundaryQuery query,
+      Sequence<Result<TimeBoundaryResultValue>> resultSequence
+  )
+  {
+    if (query.isMaxTime()) {
+      return Sequences.map(
+          resultSequence,
+          result -> result == null || result.getValue() == null || result.getValue().getMaxTime() == null ? null :
+                    new Object[]{result.getValue().getMaxTime().getMillis()}
+      );
+    } else if (query.isMinTime()) {
+      return Sequences.map(
+          resultSequence,
+          result -> result == null || result.getValue() == null || result.getValue().getMinTime() == null ? null :
+                    new Object[]{result.getValue().getMinTime().getMillis()}
+      );
+    } else {
+      return super.resultsAsArrays(query, resultSequence);
+    }
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
index 07775d4528..2fe21dafbf 100644
--- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.timeboundary;
 import com.google.common.base.Function;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.BaseSequence;
@@ -45,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.filter.Filters;
 import org.joda.time.DateTime;
+import org.joda.time.Interval;
 
 import java.util.Iterator;
 import java.util.List;
@@ -155,7 +157,7 @@ public class TimeBoundaryQueryRunnerFactory
               final DateTime minTime;
               final DateTime maxTime;
 
-              if (legacyQuery.getFilter() != null) {
+              if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
                 minTime = getTimeBoundary(adapter, legacyQuery, false);
                 if (minTime == null) {
                   maxTime = null;
@@ -183,6 +185,15 @@ public class TimeBoundaryQueryRunnerFactory
             {
 
             }
+
+            private boolean queryIntervalContainsAdapterInterval()
+            {
+              List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
+              if (queryIntervals.size() != 1) {
+                throw new IAE("Should only have one interval, got[%s]", queryIntervals);
+              }
+              return queryIntervals.get(0).contains(adapter.getInterval());
+            }
           }
       );
     }
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
index 6ab886fe08..6a96f01360 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChestTest.java
@@ -25,11 +25,14 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.CacheStrategy;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.timeline.LogicalSegment;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -289,6 +292,43 @@ public class TimeBoundaryQueryQueryToolChestTest
     Assert.assertEquals(7, segments.size());
   }
 
+  @Test(expected = UOE.class)
+  public void testResultArraySignature()
+  {
+    TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                .dataSource("testing")
+                                                .build();
+    new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+  }
+
+  @Test
+  public void testResultArraySignatureWithMinTime()
+  {
+    TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                .dataSource("testing")
+                                                .bound(TimeBoundaryQuery.MIN_TIME)
+                                                .context(ImmutableMap.of(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "foo"))
+                                                .build();
+    RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+    RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
+    expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
+    Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
+  }
+
+  @Test
+  public void testResultArraySignatureWithMaxTime()
+  {
+    TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                .dataSource("testing")
+                                                .bound(TimeBoundaryQuery.MAX_TIME)
+                                                .context(ImmutableMap.of(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "foo"))
+                                                .build();
+    RowSignature rowSignature = new TimeBoundaryQueryQueryToolChest().resultArraySignature(timeBoundaryQuery);
+    RowSignature.Builder expectedRowSignatureBuilder = RowSignature.builder();
+    expectedRowSignatureBuilder.add("foo", ColumnType.LONG);
+    Assert.assertEquals(expectedRowSignatureBuilder.build(), rowSignature);
+  }
+
   @Test
   public void testCacheStrategy() throws Exception
   {
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index e1ceab34de..4b46bbcf87 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.druid.query.timeboundary;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.io.CharSource;
 import org.apache.commons.lang.StringUtils;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryPlus;
@@ -35,6 +38,7 @@ import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.context.ConcurrentResponseContext;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
@@ -47,6 +51,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
@@ -183,6 +188,32 @@ public class TimeBoundaryQueryRunnerTest
     Assert.assertEquals(DateTimes.of("2011-01-16T00:00:00.000Z"), maxTime);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testTimeFilteredTimeBoundaryQuery() throws IOException
+  {
+    QueryRunner customRunner = getCustomRunner();
+    TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                .dataSource("testing")
+                                                .intervals(
+                                                    new MultipleIntervalSegmentSpec(
+                                                        ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
+                                                    )
+                                                )
+                                                .build();
+    List<Result<TimeBoundaryResultValue>> results =
+        customRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
+
+    Assert.assertTrue(Iterables.size(results) > 0);
+
+    TimeBoundaryResultValue val = results.iterator().next().getValue();
+    DateTime minTime = val.getMinTime();
+    DateTime maxTime = val.getMaxTime();
+
+    Assert.assertEquals(DateTimes.of("2011-01-15T00:00:00.000Z"), minTime);
+    Assert.assertEquals(DateTimes.of("2011-01-15T01:00:00.000Z"), maxTime);
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testFilteredTimeBoundaryQueryNoMatches() throws IOException
@@ -216,6 +247,22 @@ public class TimeBoundaryQueryRunnerTest
     Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
   }
 
+  @Test(expected = UOE.class)
+  @SuppressWarnings("unchecked")
+  public void testTimeBoundaryArrayResults()
+  {
+    TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                   .dataSource("testing")
+                                                   .bound(null)
+                                                   .build();
+    ResponseContext context = ConcurrentResponseContext.createEmpty();
+    context.initializeMissingSegments();
+    new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+        timeBoundaryQuery,
+        runner.run(QueryPlus.wrap(timeBoundaryQuery), context)
+    ).toList();
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testTimeBoundaryMax()
@@ -235,6 +282,26 @@ public class TimeBoundaryQueryRunnerTest
     Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testTimeBoundaryMaxArraysResults()
+  {
+    TimeBoundaryQuery maxTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                   .dataSource("testing")
+                                                   .bound(TimeBoundaryQuery.MAX_TIME)
+                                                   .build();
+    ResponseContext context = ConcurrentResponseContext.createEmpty();
+    context.initializeMissingSegments();
+    List<Object[]> maxTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+        maxTimeBoundaryQuery,
+        runner.run(QueryPlus.wrap(maxTimeBoundaryQuery), context)
+    ).toList();
+
+    Long maxTimeMillis = (Long) maxTime.get(0)[0];
+    Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), new DateTime(maxTimeMillis, DateTimeZone.UTC));
+    Assert.assertEquals(1, maxTime.size());
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testTimeBoundaryMin()
@@ -254,6 +321,26 @@ public class TimeBoundaryQueryRunnerTest
     Assert.assertNull(maxTime);
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testTimeBoundaryMinArraysResults()
+  {
+    TimeBoundaryQuery minTimeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
+                                                   .dataSource("testing")
+                                                   .bound(TimeBoundaryQuery.MIN_TIME)
+                                                   .build();
+    ResponseContext context = ConcurrentResponseContext.createEmpty();
+    context.initializeMissingSegments();
+    List<Object[]> minTime = new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
+        minTimeBoundaryQuery,
+        runner.run(QueryPlus.wrap(minTimeBoundaryQuery), context)
+    ).toList();
+
+    Long minTimeMillis = (Long) minTime.get(0)[0];
+    Assert.assertEquals(DateTimes.of("2011-01-12T00:00:00.000Z"), new DateTime(minTimeMillis, DateTimeZone.UTC));
+    Assert.assertEquals(1, minTime.size());
+  }
+
   @Test
   public void testMergeResults()
   {
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 9e7c234c0c..0dfe0d5362 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -55,6 +55,8 @@ import org.apache.druid.query.scan.ScanQueryConfig;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanQueryQueryToolChest;
 import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@@ -344,6 +346,7 @@ public class QueryStackTests
                 )
             )
             .put(GroupByQuery.class, groupByQueryRunnerFactory)
+            .put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER))
             .build()
     );
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 98f2dfe045..f9f5083f17 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -49,7 +49,11 @@ import org.apache.druid.query.DataSource;
 import org.apache.druid.query.JoinDataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
@@ -59,6 +63,7 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparator;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.topn.DimensionTopNMetricSpec;
 import org.apache.druid.query.topn.InvertedTopNMetricSpec;
@@ -799,6 +804,11 @@ public class DruidQuery
       }
     }
 
+    final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
+    if (timeBoundaryQuery != null) {
+      return timeBoundaryQuery;
+    }
+
     final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector);
     if (tsQuery != null) {
       return tsQuery;
@@ -822,6 +832,69 @@ public class DruidQuery
     throw new CannotBuildQueryException("Cannot convert query parts into an actual query");
   }
 
+  /**
+   * Return this query as a TimeBoundary query, or null if this query is not compatible with Timeseries.
+   *
+   * @return a TimeBoundaryQuery if possible. null if it is not possible to construct one.
+   */
+  @Nullable
+  private TimeBoundaryQuery toTimeBoundaryQuery()
+  {
+    if (grouping == null
+        || grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
+        || grouping.getHavingFilter() != null
+        || selectProjection != null) {
+      return null;
+    }
+
+    if (sorting != null && sorting.getOffsetLimit().hasOffset()) {
+      // Timeboundary cannot handle offsets.
+      return null;
+    }
+
+    if (grouping.getDimensions().isEmpty() &&
+        grouping.getPostAggregators().isEmpty() &&
+        grouping.getAggregatorFactories().size() == 1) { // currently only handles max(__time) or min(__time) not both
+      boolean minTime;
+      AggregatorFactory aggregatorFactory = Iterables.getOnlyElement(grouping.getAggregatorFactories());
+      if (aggregatorFactory instanceof LongMaxAggregatorFactory ||
+          aggregatorFactory instanceof LongMinAggregatorFactory) {
+        SimpleLongAggregatorFactory minMaxFactory = (SimpleLongAggregatorFactory) aggregatorFactory;
+        String fieldName = minMaxFactory.getFieldName();
+        if (fieldName == null ||
+            !fieldName.equals(ColumnHolder.TIME_COLUMN_NAME) ||
+            (minMaxFactory.getExpression() != null && !minMaxFactory.getExpression().isEmpty())) {
+          return null;
+        }
+        minTime = aggregatorFactory instanceof LongMinAggregatorFactory;
+      } else {
+        return null;
+      }
+      final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration(
+          dataSource,
+          filter,
+          virtualColumnRegistry
+      );
+      final DataSource newDataSource = dataSourceFiltrationPair.lhs;
+      final Filtration filtration = dataSourceFiltrationPair.rhs;
+      String bound = minTime ? TimeBoundaryQuery.MIN_TIME : TimeBoundaryQuery.MAX_TIME;
+      HashMap<String, Object> context = new HashMap<>(plannerContext.getQueryContext().getMergedParams());
+      if (minTime) {
+        context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
+      } else {
+        context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, aggregatorFactory.getName());
+      }
+      return new TimeBoundaryQuery(
+          newDataSource,
+          filtration.getQuerySegmentSpec(),
+          bound,
+          filtration.getDimFilter(),
+          context
+      );
+    }
+    return null;
+  }
+
   /**
    * Return this query as a Timeseries query, or null if this query is not compatible with Timeseries.
    *
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index 2d5b3b043d..1c4227102f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -72,6 +72,7 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
 import org.apache.druid.query.topn.DimensionTopNMetricSpec;
 import org.apache.druid.query.topn.InvertedTopNMetricSpec;
 import org.apache.druid.query.topn.NumericTopNMetricSpec;
@@ -2387,6 +2388,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
       cannotVectorize();
     }
 
+    Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+    maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
     testQuery(
         "SELECT DISTINCT __time FROM druid.foo WHERE __time IN (SELECT MAX(__time) FROM druid.foo)",
         queryContext,
@@ -2396,14 +2399,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                     join(
                         new TableDataSource(CalciteTests.DATASOURCE1),
                         new QueryDataSource(
-                            Druids.newTimeseriesQueryBuilder()
-                                .dataSource(CalciteTests.DATASOURCE1)
-                                .intervals(querySegmentSpec(Filtration.eternity()))
-                                .granularity(Granularities.ALL)
-                                .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                .context(QUERY_CONTEXT_DEFAULT)
-                                .build()
-                                .withOverriddenContext(queryContext)
+                            Druids.newTimeBoundaryQueryBuilder()
+                                  .dataSource(CalciteTests.DATASOURCE1)
+                                  .intervals(querySegmentSpec(Filtration.eternity()))
+                                  .bound(TimeBoundaryQuery.MAX_TIME)
+                                  .context(maxTimeQueryContext)
+                                  .build()
                         ),
                         "j0.",
                         equalsCondition(
@@ -2433,6 +2434,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
     // Cannot vectorize JOIN operator.
     cannotVectorize();
 
+    Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+    maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
     testQuery(
         "SELECT DISTINCT __time FROM druid.foo WHERE __time NOT IN (SELECT MAX(__time) FROM druid.foo)",
         queryContext,
@@ -2446,13 +2449,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                                 GroupByQuery
                                     .builder()
                                     .setDataSource(
-                                        Druids.newTimeseriesQueryBuilder()
-                                            .dataSource(CalciteTests.DATASOURCE1)
-                                            .intervals(querySegmentSpec(Filtration.eternity()))
-                                            .granularity(Granularities.ALL)
-                                            .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                            .context(QUERY_CONTEXT_DEFAULT)
-                                            .build()
+                                        Druids.newTimeBoundaryQueryBuilder()
+                                              .dataSource(CalciteTests.DATASOURCE1)
+                                              .intervals(querySegmentSpec(Filtration.eternity()))
+                                              .bound(TimeBoundaryQuery.MAX_TIME)
+                                              .context(maxTimeQueryContext)
+                                              .build()
                                     )
                                     .setInterval(querySegmentSpec(Filtration.eternity()))
                                     .setGranularity(Granularities.ALL)
@@ -3566,6 +3568,8 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
       cannotVectorize();
     }
 
+    Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+    maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
     testQuery(
         "SELECT dim1, COUNT(*) FROM foo\n"
             + "WHERE dim1 IN ('abc', 'def')"
@@ -3580,28 +3584,26 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                         join(
                             new TableDataSource(CalciteTests.DATASOURCE1),
                             new QueryDataSource(
-                                Druids.newTimeseriesQueryBuilder()
-                                    .dataSource(CalciteTests.DATASOURCE1)
-                                    .intervals(querySegmentSpec(Filtration.eternity()))
-                                    .granularity(Granularities.ALL)
-                                    .filters(selector("cnt", "1", null))
-                                    .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                    .context(QUERY_CONTEXT_DEFAULT)
-                                    .build()
+                                Druids.newTimeBoundaryQueryBuilder()
+                                      .dataSource(CalciteTests.DATASOURCE1)
+                                      .intervals(querySegmentSpec(Filtration.eternity()))
+                                      .bound(TimeBoundaryQuery.MAX_TIME)
+                                      .filters(selector("cnt", "1", null))
+                                      .context(maxTimeQueryContext)
+                                      .build()
                             ),
                             "j0.",
                             "(\"__time\" == \"j0.a0\")",
                             JoinType.INNER
                         ),
                         new QueryDataSource(
-                            Druids.newTimeseriesQueryBuilder()
-                                .dataSource(CalciteTests.DATASOURCE1)
-                                .intervals(querySegmentSpec(Filtration.eternity()))
-                                .granularity(Granularities.ALL)
-                                .filters(not(selector("cnt", "2", null)))
-                                .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                .context(QUERY_CONTEXT_DEFAULT)
-                                .build()
+                            Druids.newTimeBoundaryQueryBuilder()
+                                  .dataSource(CalciteTests.DATASOURCE1)
+                                  .intervals(querySegmentSpec(Filtration.eternity()))
+                                  .bound(TimeBoundaryQuery.MAX_TIME)
+                                  .filters(not(selector("cnt", "2", null)))
+                                  .context(maxTimeQueryContext)
+                                  .build()
                         ),
                         "_j0.",
                         "(\"__time\" == \"_j0.a0\")",
@@ -3626,6 +3628,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
   {
     cannotVectorize();
 
+    Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
+    minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+    Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+    maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
     testQuery(
         "SELECT dim1, COUNT(*) FROM foo\n"
             + "WHERE dim1 IN ('abc', 'def')\n"
@@ -3641,13 +3647,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                             join(
                                 new TableDataSource(CalciteTests.DATASOURCE1),
                                 new QueryDataSource(
-                                    Druids.newTimeseriesQueryBuilder()
-                                        .dataSource(CalciteTests.DATASOURCE1)
-                                        .intervals(querySegmentSpec(Filtration.eternity()))
-                                        .granularity(Granularities.ALL)
-                                        .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                        .context(QUERY_CONTEXT_DEFAULT)
-                                        .build()
+                                    Druids.newTimeBoundaryQueryBuilder()
+                                          .dataSource(CalciteTests.DATASOURCE1)
+                                          .intervals(querySegmentSpec(Filtration.eternity()))
+                                          .bound(TimeBoundaryQuery.MAX_TIME)
+                                          .context(maxTimeQueryContext)
+                                          .build()
                                 ),
                                 "j0.",
                                 "(\"__time\" == \"j0.a0\")",
@@ -3657,15 +3662,12 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                                 GroupByQuery.builder()
                                     .setDataSource(
                                         new QueryDataSource(
-                                            Druids.newTimeseriesQueryBuilder()
-                                                .dataSource(CalciteTests.DATASOURCE1)
-                                                .intervals(querySegmentSpec(Filtration.eternity()))
-                                                .granularity(Granularities.ALL)
-                                                .aggregators(
-                                                    new LongMinAggregatorFactory("a0", "__time")
-                                                )
-                                                .context(QUERY_CONTEXT_DEFAULT)
-                                                .build()
+                                            Druids.newTimeBoundaryQueryBuilder()
+                                                  .dataSource(CalciteTests.DATASOURCE1)
+                                                  .intervals(querySegmentSpec(Filtration.eternity()))
+                                                  .bound(TimeBoundaryQuery.MIN_TIME)
+                                                  .context(minTimeQueryContext)
+                                                  .build()
                                         )
                                     )
                                     .setInterval(querySegmentSpec(Filtration.eternity()))
@@ -3730,6 +3732,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
   {
     cannotVectorize();
 
+    Map<String, Object> minTimeQueryContext = new HashMap<>(queryContext);
+    minTimeQueryContext.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+    Map<String, Object> maxTimeQueryContext = new HashMap<>(queryContext);
+    maxTimeQueryContext.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
     testQuery(
         "SELECT dim1, COUNT(*) FROM\n"
             + "foo\n"
@@ -3745,26 +3751,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                         join(
                             new TableDataSource(CalciteTests.DATASOURCE1),
                             new QueryDataSource(
-                                Druids.newTimeseriesQueryBuilder()
-                                    .dataSource(CalciteTests.DATASOURCE1)
-                                    .intervals(querySegmentSpec(Filtration.eternity()))
-                                    .granularity(Granularities.ALL)
-                                    .aggregators(new LongMaxAggregatorFactory("a0", "__time"))
-                                    .context(QUERY_CONTEXT_DEFAULT)
-                                    .build()
+                                Druids.newTimeBoundaryQueryBuilder()
+                                      .dataSource(CalciteTests.DATASOURCE1)
+                                      .intervals(querySegmentSpec(Filtration.eternity()))
+                                      .bound(TimeBoundaryQuery.MAX_TIME)
+                                      .context(maxTimeQueryContext)
+                                      .build()
                             ),
                             "j0.",
                             "(\"__time\" == \"j0.a0\")",
                             JoinType.INNER
                         ),
                         new QueryDataSource(
-                            Druids.newTimeseriesQueryBuilder()
-                                .dataSource(CalciteTests.DATASOURCE1)
-                                .intervals(querySegmentSpec(Filtration.eternity()))
-                                .granularity(Granularities.ALL)
-                                .aggregators(new LongMinAggregatorFactory("a0", "__time"))
-                                .context(QUERY_CONTEXT_DEFAULT)
-                                .build()
+                            Druids.newTimeBoundaryQueryBuilder()
+                                  .dataSource(CalciteTests.DATASOURCE1)
+                                  .intervals(querySegmentSpec(Filtration.eternity()))
+                                  .bound(TimeBoundaryQuery.MIN_TIME)
+                                  .context(minTimeQueryContext)
+                                  .build()
                         ),
                         "_j0.",
                         "(\"__time\" == \"_j0.a0\")",
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
new file mode 100644
index 0000000000..74e42c5365
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class CalciteTimeBoundaryQueryTest extends BaseCalciteQueryTest
+{
+  // __time for foo is [2000-01-01, 2000-01-02, 2000-01-03, 2001-01-01, 2001-01-02, 2001-01-03]
+  @Test
+  public void testMaxTimeQuery() throws Exception
+  {
+    HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+    context.put(TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, "a0");
+    testQuery(
+        "SELECT MAX(__time) AS maxTime FROM foo",
+        ImmutableList.of(
+            Druids.newTimeBoundaryQueryBuilder()
+                  .dataSource("foo")
+                  .bound(TimeBoundaryQuery.MAX_TIME)
+                  .context(context)
+                  .build()
+        ),
+        ImmutableList.of(new Object[]{DateTimes.of("2001-01-03").getMillis()})
+    );
+  }
+
+  @Test
+  public void testMinTimeQuery() throws Exception
+  {
+    HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+    context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+    testQuery(
+        "SELECT MIN(__time) AS minTime FROM foo",
+        ImmutableList.of(
+            Druids.newTimeBoundaryQueryBuilder()
+                  .dataSource("foo")
+                  .bound(TimeBoundaryQuery.MIN_TIME)
+                  .context(context)
+                  .build()
+        ),
+        ImmutableList.of(new Object[]{DateTimes.of("2000-01-01").getMillis()})
+    );
+  }
+
+  @Test
+  public void testMinTimeQueryWithFilters() throws Exception
+  {
+    HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+    context.put(TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, "a0");
+    testQuery(
+        "SELECT MIN(__time) AS minTime FROM foo where __time >= '2001-01-01' and __time < '2003-01-01'",
+        ImmutableList.of(
+            Druids.newTimeBoundaryQueryBuilder()
+                  .dataSource("foo")
+                  .intervals(
+                      new MultipleIntervalSegmentSpec(
+                          ImmutableList.of(Intervals.of("2001-01-01T00:00:00.000Z/2003-01-01T00:00:00.000Z"))
+                      )
+                  )
+                  .bound(TimeBoundaryQuery.MIN_TIME)
+                  .context(context)
+                  .build()
+        ),
+        ImmutableList.of(new Object[]{DateTimes.of("2001-01-01").getMillis()})
+    );
+  }
+
+  // Currently, if both min(__time) and max(__time) are present, we don't convert it
+  // to a timeBoundary query. (ref : https://github.com/apache/druid/issues/12479)
+  @Test
+  public void testMinMaxTimeQuery() throws Exception
+  {
+    testQuery(
+        "SELECT MIN(__time) AS minTime, MAX(__time) as maxTime FROM foo",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource("foo")
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .aggregators(
+                      new LongMinAggregatorFactory("a0", "__time"),
+                      new LongMaxAggregatorFactory("a1", "__time")
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(new Object[]{
+            DateTimes.of("2000-01-01").getMillis(),
+            DateTimes.of("2001-01-03").getMillis()
+        })
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org