You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2018/08/01 21:39:41 UTC

[incubator-druid] branch master updated: Optimize filtered aggs with interval filters in per-segment queries (#5857)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b9c445c  Optimize filtered aggs with interval filters in per-segment queries (#5857)
b9c445c is described below

commit b9c445c7803bc2bffbd59cb619a80cd092ced349
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Aug 1 14:39:38 2018 -0700

    Optimize filtered aggs with interval filters in per-segment queries (#5857)
    
    * Optimize per-segment queries
    
    * Always optimize, add unit test
    
    * PR comments
    
    * Only run IntervalDimFilter optimization on __time column
    
    * PR comments
    
    * Checkstyle fix
    
    * Add test for non __time column
---
 .../query/timecompare/TimeCompareBenchmark.java    | 457 +++++++++++++++++++++
 .../query/PerSegmentOptimizingQueryRunner.java     |  59 +++
 .../query/PerSegmentQueryOptimizationContext.java  |  43 ++
 processing/src/main/java/io/druid/query/Query.java |   5 +
 .../src/main/java/io/druid/query/QueryPlus.java    |   5 +
 .../druid/query/aggregation/AggregatorFactory.java |   9 +
 .../io/druid/query/aggregation/AggregatorUtil.java |   3 +
 .../aggregation/FilteredAggregatorFactory.java     |  65 +++
 .../aggregation/SuppressedAggregatorFactory.java   | 374 +++++++++++++++++
 .../io/druid/query/timeseries/TimeseriesQuery.java |  17 +
 .../main/java/io/druid/query/topn/TopNQuery.java   |  17 +
 .../query/topn/PerSegmentQueryOptimizeTest.java    | 111 +++++
 .../druid/server/coordination/ServerManager.java   |  76 ++--
 13 files changed, 1212 insertions(+), 29 deletions(-)

diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
new file mode 100644
index 0000000..97522d5
--- /dev/null
+++ b/benchmarks/src/main/java/io/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.benchmark.query.timecompare;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import io.druid.benchmark.datagen.BenchmarkDataGenerator;
+import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
+import io.druid.benchmark.datagen.BenchmarkSchemas;
+import io.druid.benchmark.query.QueryBenchmarkUtil;
+import io.druid.collections.StupidPool;
+import io.druid.data.input.InputRow;
+import io.druid.hll.HyperLogLogHash;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.java.util.common.Intervals;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.math.expr.ExprMacroTable;
+import io.druid.offheap.OffheapBufferGenerator;
+import io.druid.query.Druids;
+import io.druid.query.FinalizeResultsQueryRunner;
+import io.druid.query.PerSegmentOptimizingQueryRunner;
+import io.druid.query.PerSegmentQueryOptimizationContext;
+import io.druid.query.Query;
+import io.druid.query.QueryPlus;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerFactory;
+import io.druid.query.QueryToolChest;
+import io.druid.query.Result;
+import io.druid.query.SegmentDescriptor;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.FilteredAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
+import io.druid.query.filter.IntervalDimFilter;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.spec.QuerySegmentSpec;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.query.topn.TopNQueryBuilder;
+import io.druid.query.topn.TopNQueryConfig;
+import io.druid.query.topn.TopNQueryQueryToolChest;
+import io.druid.query.topn.TopNQueryRunnerFactory;
+import io.druid.query.topn.TopNResultValue;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMergerV9;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.QueryableIndexSegment;
+import io.druid.segment.column.Column;
+import io.druid.segment.column.ColumnConfig;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.serde.ComplexMetrics;
+import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.Interval;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 50)
+@Measurement(iterations = 200)
+public class TimeCompareBenchmark
+{
+  @Param({"10"})
+  private int numSegments;
+
+  @Param({"100000"})
+  private int rowsPerSegment;
+
+  @Param({"100"})
+  private int threshold;
+
+  protected static final Map<String, String> scriptDoubleSum = Maps.newHashMap();
+  static {
+    scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }");
+    scriptDoubleSum.put("fnReset", "function reset() { return 0 }");
+    scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }");
+  }
+
+  private static final Logger log = new Logger(TimeCompareBenchmark.class);
+  private static final int RNG_SEED = 9999;
+  private static final IndexMergerV9 INDEX_MERGER_V9;
+  private static final IndexIO INDEX_IO;
+  public static final ObjectMapper JSON_MAPPER;
+
+  private List<IncrementalIndex> incIndexes;
+  private List<QueryableIndex> qIndexes;
+
+  private QueryRunnerFactory topNFactory;
+  private Query topNQuery;
+  private QueryRunner topNRunner;
+
+
+  private QueryRunnerFactory timeseriesFactory;
+  private Query timeseriesQuery;
+  private QueryRunner timeseriesRunner;
+
+  private BenchmarkSchemaInfo schemaInfo;
+  private File tmpDir;
+  private Interval[] segmentIntervals;
+
+  private ExecutorService executorService;
+
+  static {
+    JSON_MAPPER = new DefaultObjectMapper();
+    InjectableValues.Std injectableValues = new InjectableValues.Std();
+    injectableValues.addValue(ExprMacroTable.class, ExprMacroTable.nil());
+    JSON_MAPPER.setInjectableValues(injectableValues);
+
+    INDEX_IO = new IndexIO(
+        JSON_MAPPER,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        new ColumnConfig()
+        {
+          @Override
+          public int columnCacheSizeBytes()
+          {
+            return 0;
+          }
+        }
+    );
+    INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
+  }
+
+  private static final Map<String, Map<String, Object>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
+
+  private void setupQueries()
+  {
+    // queries for the basic schema
+    Map<String, Object> basicQueries = new LinkedHashMap<>();
+    BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+
+    QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
+
+    long startMillis = basicSchema.getDataInterval().getStartMillis();
+    long endMillis = basicSchema.getDataInterval().getEndMillis();
+    long half = (endMillis - startMillis) / 2;
+
+    Interval recent = Intervals.utc(half, endMillis);
+    Interval previous = Intervals.utc(startMillis, half);
+
+    log.info("Recent interval: " + recent);
+    log.info("Previous interval: " + previous);
+
+    { // basic.topNTimeCompare
+      List<AggregatorFactory> queryAggs = new ArrayList<>();
+      queryAggs.add(
+          new FilteredAggregatorFactory(
+              //jsAgg1,
+              new LongSumAggregatorFactory(
+                  "sumLongSequential", "sumLongSequential"
+              ),
+              new IntervalDimFilter(
+                  Column.TIME_COLUMN_NAME,
+                  Collections.singletonList(recent),
+                  null
+              )
+          )
+      );
+      queryAggs.add(
+          new FilteredAggregatorFactory(
+              new LongSumAggregatorFactory(
+                  "_cmp_sumLongSequential", "sumLongSequential"
+              ),
+              new IntervalDimFilter(
+                  Column.TIME_COLUMN_NAME,
+                  Collections.singletonList(previous),
+                  null
+              )
+          )
+      );
+
+      TopNQueryBuilder queryBuilderA = new TopNQueryBuilder()
+          .dataSource("blah")
+          .granularity(Granularities.ALL)
+          .dimension("dimUniform")
+          .metric("sumLongSequential")
+          .intervals(intervalSpec)
+          .aggregators(queryAggs)
+          .threshold(threshold);
+
+      topNQuery = queryBuilderA.build();
+      topNFactory = new TopNQueryRunnerFactory(
+          new StupidPool<>(
+              "TopNBenchmark-compute-bufferPool",
+              new OffheapBufferGenerator("compute", 250000000),
+              0,
+              Integer.MAX_VALUE
+          ),
+          new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
+          QueryBenchmarkUtil.NOOP_QUERYWATCHER
+      );
+
+      basicQueries.put("topNTimeCompare", queryBuilderA);
+    }
+    { // basic.timeseriesTimeCompare
+      List<AggregatorFactory> queryAggs = new ArrayList<>();
+      queryAggs.add(
+          new FilteredAggregatorFactory(
+              new LongSumAggregatorFactory(
+                  "sumLongSequential", "sumLongSequential"
+              ),
+              new IntervalDimFilter(
+                  Column.TIME_COLUMN_NAME,
+                  Collections.singletonList(recent),
+                  null
+              )
+          )
+      );
+      queryAggs.add(
+          new FilteredAggregatorFactory(
+              new LongSumAggregatorFactory(
+                  "_cmp_sumLongSequential", "sumLongSequential"
+              ),
+              new IntervalDimFilter(
+                  Column.TIME_COLUMN_NAME,
+                  Collections.singletonList(previous),
+                  null
+              )
+          )
+      );
+
+      Druids.TimeseriesQueryBuilder timeseriesQueryBuilder = Druids.newTimeseriesQueryBuilder()
+                                                                   .dataSource("blah")
+                                                                   .granularity(Granularities.ALL)
+                                                                   .intervals(intervalSpec)
+                                                                   .aggregators(queryAggs)
+                                                                   .descending(false);
+
+      timeseriesQuery = timeseriesQueryBuilder.build();
+      timeseriesFactory = new TimeseriesQueryRunnerFactory(
+          new TimeseriesQueryQueryToolChest(
+              QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
+          ),
+          new TimeseriesQueryEngine(),
+          QueryBenchmarkUtil.NOOP_QUERYWATCHER
+      );
+    }
+
+    SCHEMA_QUERY_MAP.put("basic", basicQueries);
+  }
+
+
+  @Setup
+  public void setup() throws IOException
+  {
+    log.info("SETUP CALLED AT " + System.currentTimeMillis());
+
+    if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
+      ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
+    }
+
+    executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
+
+    setupQueries();
+
+    String schemaName = "basic";
+    schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+    segmentIntervals = new Interval[numSegments];
+
+    long startMillis = schemaInfo.getDataInterval().getStartMillis();
+    long endMillis = schemaInfo.getDataInterval().getEndMillis();
+    long partialIntervalMillis = (endMillis - startMillis) / numSegments;
+    for (int i = 0; i < numSegments; i++) {
+      long partialEndMillis = startMillis + partialIntervalMillis;
+      segmentIntervals[i] = Intervals.utc(startMillis, partialEndMillis);
+      log.info("Segment [%d] with interval [%s]", i, segmentIntervals[i]);
+      startMillis = partialEndMillis;
+    }
+
+    incIndexes = new ArrayList<>();
+    for (int i = 0; i < numSegments; i++) {
+      log.info("Generating rows for segment " + i);
+
+      BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+          schemaInfo.getColumnSchemas(),
+          RNG_SEED + i,
+          segmentIntervals[i],
+          rowsPerSegment
+      );
+
+      IncrementalIndex incIndex = makeIncIndex();
+
+      for (int j = 0; j < rowsPerSegment; j++) {
+        InputRow row = gen.nextRow();
+        if (j % 10000 == 0) {
+          log.info(j + " rows generated.");
+        }
+        incIndex.add(row);
+      }
+      incIndexes.add(incIndex);
+    }
+
+    tmpDir = Files.createTempDir();
+    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
+
+    qIndexes = new ArrayList<>();
+    for (int i = 0; i < numSegments; i++) {
+      File indexFile = INDEX_MERGER_V9.persist(
+          incIndexes.get(i),
+          tmpDir,
+          new IndexSpec(),
+          null
+      );
+
+      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
+      qIndexes.add(qIndex);
+    }
+
+    List<QueryRunner<Result<TopNResultValue>>> singleSegmentRunners = Lists.newArrayList();
+    QueryToolChest toolChest = topNFactory.getToolchest();
+    for (int i = 0; i < numSegments; i++) {
+      String segmentName = "qIndex" + i;
+      QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
+          topNFactory,
+          segmentName,
+          new QueryableIndexSegment(segmentName, qIndexes.get(i))
+      );
+      singleSegmentRunners.add(
+          new PerSegmentOptimizingQueryRunner<>(
+              toolChest.preMergeQueryDecoration(runner),
+              new PerSegmentQueryOptimizationContext(
+                  new SegmentDescriptor(segmentIntervals[i], "1", 0)
+              )
+          )
+      );
+    }
+
+    topNRunner = toolChest.postMergeQueryDecoration(
+        new FinalizeResultsQueryRunner<>(
+            toolChest.mergeResults(topNFactory.mergeRunners(executorService, singleSegmentRunners)),
+            toolChest
+        )
+    );
+
+    List<QueryRunner<Result<TimeseriesResultValue>>> singleSegmentRunnersT = Lists.newArrayList();
+    QueryToolChest toolChestT = timeseriesFactory.getToolchest();
+    for (int i = 0; i < numSegments; i++) {
+      String segmentName = "qIndex" + i;
+      QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
+          timeseriesFactory,
+          segmentName,
+          new QueryableIndexSegment(segmentName, qIndexes.get(i))
+      );
+      singleSegmentRunnersT.add(
+          new PerSegmentOptimizingQueryRunner<>(
+              toolChestT.preMergeQueryDecoration(runner),
+              new PerSegmentQueryOptimizationContext(
+                  new SegmentDescriptor(segmentIntervals[i], "1", 0)
+              )
+          )
+      );
+    }
+
+    timeseriesRunner = toolChestT.postMergeQueryDecoration(
+        new FinalizeResultsQueryRunner<>(
+            toolChestT.mergeResults(timeseriesFactory.mergeRunners(executorService, singleSegmentRunnersT)),
+            toolChestT
+        )
+    );
+  }
+
+  @TearDown
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  private IncrementalIndex makeIncIndex()
+  {
+    return new IncrementalIndex.Builder()
+        .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
+        .setReportParseExceptions(false)
+        .setMaxRowCount(rowsPerSegment)
+        .buildOnheap();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void queryMultiQueryableIndexTopN(Blackhole blackhole)
+  {
+    Sequence<Result<TopNResultValue>> queryResult = topNRunner.run(
+        QueryPlus.wrap(topNQuery),
+        Maps.<String, Object>newHashMap()
+    );
+    List<Result<TopNResultValue>> results = queryResult.toList();
+
+    for (Result<TopNResultValue> result : results) {
+      blackhole.consume(result);
+    }
+  }
+
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void queryMultiQueryableIndexTimeseries(Blackhole blackhole)
+  {
+    Sequence<Result<TimeseriesResultValue>> queryResult = timeseriesRunner.run(
+        QueryPlus.wrap(timeseriesQuery),
+        Maps.<String, Object>newHashMap()
+    );
+    List<Result<TimeseriesResultValue>> results = queryResult.toList();
+
+    for (Result<TimeseriesResultValue> result : results) {
+      blackhole.consume(result);
+    }
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java b/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java
new file mode 100644
index 0000000..11e7f2f
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/PerSegmentOptimizingQueryRunner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query;
+
+import io.druid.java.util.common.guava.Sequence;
+
+import java.util.Map;
+
+/**
+ * This runner optimizes queries made on a single segment, using per-segment information,
+ * before submitting the queries to the base runner.
+ *
+ * Example optimizations include adjusting query filters based on per-segment information, such as intervals.
+ *
+ * This query runner should only wrap base query runners that will
+ * be used to query a single segment (i.e., when the query reaches a historical node).
+ *
+ * @param <T>
+ */
+public class PerSegmentOptimizingQueryRunner<T> implements QueryRunner<T>
+{
+  private final QueryRunner<T> base;
+  private final PerSegmentQueryOptimizationContext optimizationContext;
+
+  public PerSegmentOptimizingQueryRunner(
+      QueryRunner<T> base,
+      PerSegmentQueryOptimizationContext optimizationContext
+  )
+  {
+    this.base = base;
+    this.optimizationContext = optimizationContext;
+  }
+
+  @Override
+  public Sequence<T> run(final QueryPlus<T> input, final Map<String, Object> responseContext)
+  {
+    return base.run(
+        input.optimizeForSegment(optimizationContext),
+        responseContext
+    );
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java b/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java
new file mode 100644
index 0000000..993ccf1
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/PerSegmentQueryOptimizationContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query;
+
+/**
+ * Holds information about a single segment that Query objects can use to optimize themselves
+ * when they are run on that single segment.
+ *
+ * @see PerSegmentOptimizingQueryRunner
+ */
+public class PerSegmentQueryOptimizationContext
+{
+  private final SegmentDescriptor segmentDescriptor;
+
+  public PerSegmentQueryOptimizationContext(
+      SegmentDescriptor segmentDescriptor
+  )
+  {
+    this.segmentDescriptor = segmentDescriptor;
+  }
+
+  public SegmentDescriptor getSegmentDescriptor()
+  {
+    return segmentDescriptor;
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java
index 6e8e69e..5212bd5 100644
--- a/processing/src/main/java/io/druid/query/Query.java
+++ b/processing/src/main/java/io/druid/query/Query.java
@@ -109,4 +109,9 @@ public interface Query<T>
   String getId();
 
   Query<T> withDataSource(DataSource dataSource);
+
+  default Query<T> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    return this;
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java
index f9a33b3..c8110bb 100644
--- a/processing/src/main/java/io/druid/query/QueryPlus.java
+++ b/processing/src/main/java/io/druid/query/QueryPlus.java
@@ -144,4 +144,9 @@ public final class QueryPlus<T>
   {
     return query.getRunner(walker).run(this, context);
   }
+
+  public QueryPlus<T> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    return new QueryPlus<>(query.optimizeForSegment(optimizationContext), queryMetrics, identity);
+  }
 }
diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
index 65858c5..22ec072 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java
@@ -23,6 +23,7 @@ import io.druid.guice.annotations.ExtensionPoint;
 import io.druid.java.util.common.Cacheable;
 import io.druid.java.util.common.UOE;
 import io.druid.java.util.common.logger.Logger;
+import io.druid.query.PerSegmentQueryOptimizationContext;
 import io.druid.segment.ColumnSelectorFactory;
 
 import javax.annotation.Nullable;
@@ -142,6 +143,14 @@ public abstract class AggregatorFactory implements Cacheable
   public abstract int getMaxIntermediateSize();
 
   /**
+   * Return a potentially optimized form of this AggregatorFactory for per-segment queries.
+   */
+  public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    return this;
+  }
+
+  /**
    * Merges the list of AggregatorFactory[] (presumable from metadata of some segments being merged) and
    * returns merged AggregatorFactory[] (for the metadata for merged segment).
    * Null is returned if it is not possible to do the merging for any of the following reason.
diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
index eedd095..8f7910d 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
@@ -98,6 +98,9 @@ public class AggregatorUtil
   public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
   public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
 
+  // Suppressed aggregator
+  public static final byte SUPPRESSED_AGG_CACHE_TYPE_ID = 0x2D;
+
   /**
    * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
    *
diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java
index 686b793..c3dfdc4 100644
--- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java
@@ -21,12 +21,17 @@ package io.druid.query.aggregation;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import io.druid.query.PerSegmentQueryOptimizationContext;
 import io.druid.query.filter.DimFilter;
+import io.druid.query.filter.IntervalDimFilter;
 import io.druid.query.filter.ValueMatcher;
 import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
 import io.druid.segment.filter.Filters;
+import org.joda.time.Interval;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
@@ -140,6 +145,66 @@ public class FilteredAggregatorFactory extends AggregatorFactory
     return delegate.getMaxIntermediateSize();
   }
 
+  @Override
+  public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    if (filter instanceof IntervalDimFilter) {
+      IntervalDimFilter intervalDimFilter = ((IntervalDimFilter) filter);
+      if (intervalDimFilter.getExtractionFn() != null) {
+        // no support for extraction functions right now
+        return this;
+      }
+
+      if (!intervalDimFilter.getDimension().equals(Column.TIME_COLUMN_NAME)) {
+        // segment time boundary optimization only applies when we filter on __time
+        return this;
+      }
+
+      Interval segmentInterval = optimizationContext.getSegmentDescriptor().getInterval();
+      List<Interval> filterIntervals = intervalDimFilter.getIntervals();
+      List<Interval> excludedFilterIntervals = new ArrayList<>();
+      List<Interval> effectiveFilterIntervals = new ArrayList<>();
+
+      boolean segmentIsCovered = false;
+      for (Interval filterInterval : filterIntervals) {
+        Interval overlap = filterInterval.overlap(segmentInterval);
+        if (overlap == null) {
+          excludedFilterIntervals.add(filterInterval);
+          continue;
+        }
+
+        if (overlap.equals(segmentInterval)) {
+          segmentIsCovered = true;
+          break;
+        } else {
+          // clip the overlapping interval to the segment time boundaries
+          effectiveFilterIntervals.add(overlap);
+        }
+      }
+
+      // we can skip applying this filter, everything in the segment will match
+      if (segmentIsCovered) {
+        return delegate;
+      }
+
+      // we can skip this filter, nothing in the segment would match
+      if (excludedFilterIntervals.size() == filterIntervals.size()) {
+        return new SuppressedAggregatorFactory(delegate);
+      }
+
+      return new FilteredAggregatorFactory(
+          delegate,
+          new IntervalDimFilter(
+              intervalDimFilter.getDimension(),
+              effectiveFilterIntervals,
+              intervalDimFilter.getExtractionFn()
+          )
+      );
+    } else {
+      return this;
+    }
+  }
+
   @JsonProperty
   public AggregatorFactory getAggregator()
   {
diff --git a/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java
new file mode 100644
index 0000000..1d393ce
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/SuppressedAggregatorFactory.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.query.PerSegmentQueryOptimizationContext;
+import io.druid.query.cache.CacheKeyBuilder;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This AggregatorFactory is meant for wrapping delegate aggregators for optimization purposes.
+ *
+ * The wrapper suppresses the aggregate() method for the underlying delegate, while leaving
+ * the behavior of other calls unchanged.
+ *
+ * This wrapper is meant to be used when an optimization decides that an aggregator can be entirely skipped
+ * (e.g., a FilteredAggregatorFactory where the filter condition will never match).
+ */
+public class SuppressedAggregatorFactory extends AggregatorFactory
+{
+  private final AggregatorFactory delegate;
+
+  public SuppressedAggregatorFactory(
+      AggregatorFactory delegate
+  )
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    return new SuppressedAggregator(delegate.factorize(metricFactory));
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    return new SuppressedBufferAggregator(delegate.factorizeBuffered(metricFactory));
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    return delegate.getComparator();
+  }
+
+  @Override
+  public Object combine(Object lhs, Object rhs)
+  {
+    return delegate.combine(lhs, rhs);
+  }
+
+  @Override
+  public AggregateCombiner makeAggregateCombiner()
+  {
+    return delegate.makeAggregateCombiner();
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return delegate.getCombiningFactory();
+  }
+
+  @Override
+  public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
+  {
+    return delegate.getMergingFactory(other);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return delegate.getRequiredColumns();
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    return delegate.deserialize(object);
+  }
+
+  @Override
+  public Object finalizeComputation(Object object)
+  {
+    return delegate.finalizeComputation(object);
+  }
+
+  @Override
+  public String getName()
+  {
+    return delegate.getName();
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return delegate.requiredFields();
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return delegate.getTypeName();
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return delegate.getMaxIntermediateSize();
+  }
+
+  @Override
+  public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    // we are already the result of an optimizeForSegment() call
+    return this;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    CacheKeyBuilder cacheKeyBuilder = new CacheKeyBuilder(AggregatorUtil.SUPPRESSED_AGG_CACHE_TYPE_ID);
+    cacheKeyBuilder.appendCacheable(delegate);
+    return cacheKeyBuilder.build();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SuppressedAggregatorFactory that = (SuppressedAggregatorFactory) o;
+    return Objects.equals(getDelegate(), that.getDelegate());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(getDelegate());
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SuppressedAggregatorFactory{" +
+           "delegate=" + delegate +
+           '}';
+  }
+
+  public AggregatorFactory getDelegate()
+  {
+    return delegate;
+  }
+
+  public static class SuppressedAggregator implements Aggregator
+  {
+    private final Aggregator delegate;
+
+    public SuppressedAggregator(
+        Aggregator delegate
+    )
+    {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void aggregate()
+    {
+      //no-op
+    }
+
+    @Nullable
+    @Override
+    public Object get()
+    {
+      return delegate.get();
+    }
+
+    @Override
+    public float getFloat()
+    {
+      return delegate.getFloat();
+    }
+
+    @Override
+    public long getLong()
+    {
+      return delegate.getLong();
+    }
+
+    @Override
+    public double getDouble()
+    {
+      return delegate.getDouble();
+    }
+
+    @Override
+    public boolean isNull()
+    {
+      return delegate.isNull();
+    }
+
+    @Override
+    public void close()
+    {
+      delegate.close();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SuppressedAggregator that = (SuppressedAggregator) o;
+      return Objects.equals(getDelegate(), that.getDelegate());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(getDelegate());
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SuppressedAggregator{" +
+             "delegate=" + delegate +
+             '}';
+    }
+
+    public Aggregator getDelegate()
+    {
+      return delegate;
+    }
+  }
+
+  public static class SuppressedBufferAggregator implements BufferAggregator
+  {
+    private final BufferAggregator delegate;
+
+    public SuppressedBufferAggregator(
+        BufferAggregator delegate
+    )
+    {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void init(ByteBuffer buf, int position)
+    {
+      delegate.init(buf, position);
+    }
+
+    @Override
+    public void aggregate(ByteBuffer buf, int position)
+    {
+      // no-op
+    }
+
+    @Override
+    public Object get(ByteBuffer buf, int position)
+    {
+      return delegate.get(buf, position);
+    }
+
+    @Override
+    public float getFloat(ByteBuffer buf, int position)
+    {
+      return delegate.getFloat(buf, position);
+    }
+
+    @Override
+    public long getLong(ByteBuffer buf, int position)
+    {
+      return delegate.getLong(buf, position);
+    }
+
+    @Override
+    public double getDouble(ByteBuffer buf, int position)
+    {
+      return delegate.getDouble(buf, position);
+    }
+
+    @Override
+    public void close()
+    {
+      delegate.close();
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      delegate.inspectRuntimeShape(inspector);
+    }
+
+    @Override
+    public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+    {
+      delegate.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+    }
+
+    @Override
+    public boolean isNull(ByteBuffer buf, int position)
+    {
+      return delegate.isNull(buf, position);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SuppressedBufferAggregator that = (SuppressedBufferAggregator) o;
+      return Objects.equals(getDelegate(), that.getDelegate());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(getDelegate());
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SuppressedBufferAggregator{" +
+             "delegate=" + delegate +
+             '}';
+    }
+
+    public BufferAggregator getDelegate()
+    {
+      return delegate;
+    }
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java
index 2fac18d..dcbb03f 100644
--- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java
+++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java
@@ -28,6 +28,7 @@ import io.druid.java.util.common.granularity.Granularity;
 import io.druid.query.BaseQuery;
 import io.druid.query.DataSource;
 import io.druid.query.Druids;
+import io.druid.query.PerSegmentQueryOptimizationContext;
 import io.druid.query.Queries;
 import io.druid.query.Query;
 import io.druid.query.Result;
@@ -37,6 +38,7 @@ import io.druid.query.filter.DimFilter;
 import io.druid.query.spec.QuerySegmentSpec;
 import io.druid.segment.VirtualColumns;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -154,6 +156,12 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
   }
 
   @Override
+  public Query<Result<TimeseriesResultValue>> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    return Druids.TimeseriesQueryBuilder.copy(this).aggregators(optimizeAggs(optimizationContext)).build();
+  }
+
+  @Override
   public TimeseriesQuery withOverriddenContext(Map<String, Object> contextOverrides)
   {
     Map<String, Object> newContext = computeOverriddenContext(getContext(), contextOverrides);
@@ -170,6 +178,15 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
     return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build();
   }
 
+  private List<AggregatorFactory> optimizeAggs(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    List<AggregatorFactory> optimizedAggs = new ArrayList<>();
+    for (AggregatorFactory aggregatorFactory : aggregatorSpecs) {
+      optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext));
+    }
+    return optimizedAggs;
+  }
+
   @Override
   public String toString()
   {
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java
index 3726b7b..6897cc5 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 import io.druid.java.util.common.granularity.Granularity;
 import io.druid.query.BaseQuery;
 import io.druid.query.DataSource;
+import io.druid.query.PerSegmentQueryOptimizationContext;
 import io.druid.query.Queries;
 import io.druid.query.Query;
 import io.druid.query.Result;
@@ -36,6 +37,7 @@ import io.druid.query.filter.DimFilter;
 import io.druid.query.spec.QuerySegmentSpec;
 import io.druid.segment.VirtualColumns;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -183,6 +185,12 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
     return new TopNQueryBuilder(this).dataSource(dataSource).build();
   }
 
+  @Override
+  public Query<Result<TopNResultValue>> optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    return new TopNQueryBuilder(this).aggregators(optimizeAggs(optimizationContext)).build();
+  }
+
   public TopNQuery withThreshold(int threshold)
   {
     return new TopNQueryBuilder(this).threshold(threshold).build();
@@ -252,4 +260,13 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
         postAggregatorSpecs
     );
   }
+
+  private List<AggregatorFactory> optimizeAggs(PerSegmentQueryOptimizationContext optimizationContext)
+  {
+    List<AggregatorFactory> optimizedAggs = new ArrayList<>();
+    for (AggregatorFactory aggregatorFactory : aggregatorSpecs) {
+      optimizedAggs.add(aggregatorFactory.optimizeForSegment(optimizationContext));
+    }
+    return optimizedAggs;
+  }
 }
diff --git a/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java b/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java
new file mode 100644
index 0000000..06a7f46
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/topn/PerSegmentQueryOptimizeTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.topn;
+
+import io.druid.java.util.common.Intervals;
+import io.druid.query.PerSegmentQueryOptimizationContext;
+import io.druid.query.SegmentDescriptor;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.FilteredAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.SuppressedAggregatorFactory;
+import io.druid.query.filter.IntervalDimFilter;
+import io.druid.segment.column.Column;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class PerSegmentQueryOptimizeTest
+{
+  @Test
+  public void testFilteredAggregatorOptimize()
+  {
+    LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory("test", "test");
+
+    FilteredAggregatorFactory aggregatorFactory = new FilteredAggregatorFactory(
+        longSumAggregatorFactory,
+        new IntervalDimFilter(
+            Column.TIME_COLUMN_NAME,
+            Collections.singletonList(Intervals.utc(1000, 2000)),
+            null
+        )
+    );
+
+    Interval exclude = Intervals.utc(2000, 3000);
+    Interval include = Intervals.utc(1500, 1600);
+    Interval partial = Intervals.utc(1500, 2500);
+
+    AggregatorFactory excludedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(exclude));
+    AggregatorFactory expectedSuppressedAgg = new SuppressedAggregatorFactory(longSumAggregatorFactory);
+    Assert.assertEquals(expectedSuppressedAgg, excludedAgg);
+
+    AggregatorFactory includedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(include));
+    Assert.assertEquals(longSumAggregatorFactory, includedAgg);
+
+    AggregatorFactory partialAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(partial));
+    AggregatorFactory expectedPartialFilteredAgg = new FilteredAggregatorFactory(
+        longSumAggregatorFactory,
+        new IntervalDimFilter(
+            Column.TIME_COLUMN_NAME,
+            Collections.singletonList(Intervals.utc(1500, 2000)),
+            null
+        )
+    );
+    Assert.assertEquals(expectedPartialFilteredAgg, partialAgg);
+  }
+
+  @Test
+  public void testFilteredAggregatorDontOptimizeOnNonTimeColumn()
+  {
+    // Filter is not on __time, so no optimizations should be made.
+    LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory("test", "test");
+
+    FilteredAggregatorFactory aggregatorFactory = new FilteredAggregatorFactory(
+        longSumAggregatorFactory,
+        new IntervalDimFilter(
+            "not_time",
+            Collections.singletonList(Intervals.utc(1000, 2000)),
+            null
+        )
+    );
+
+    Interval exclude = Intervals.utc(2000, 3000);
+    Interval include = Intervals.utc(1500, 1600);
+    Interval partial = Intervals.utc(1500, 2500);
+
+    AggregatorFactory excludedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(exclude));
+    Assert.assertEquals(aggregatorFactory, excludedAgg);
+
+    AggregatorFactory includedAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(include));
+    Assert.assertEquals(aggregatorFactory, includedAgg);
+
+    AggregatorFactory partialAgg = aggregatorFactory.optimizeForSegment(getOptimizationContext(partial));
+    Assert.assertEquals(aggregatorFactory, partialAgg);
+  }
+
+  private PerSegmentQueryOptimizationContext getOptimizationContext(Interval segmentInterval)
+  {
+    return new PerSegmentQueryOptimizationContext(
+        new SegmentDescriptor(segmentInterval, "0", 0)
+    );
+  }
+}
diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java
index 0a63303..aac2e86 100644
--- a/server/src/main/java/io/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java
@@ -39,6 +39,8 @@ import io.druid.query.DataSource;
 import io.druid.query.FinalizeResultsQueryRunner;
 import io.druid.query.MetricsEmittingQueryRunner;
 import io.druid.query.NoopQueryRunner;
+import io.druid.query.PerSegmentOptimizingQueryRunner;
+import io.druid.query.PerSegmentQueryOptimizationContext;
 import io.druid.query.Query;
 import io.druid.query.QueryMetrics;
 import io.druid.query.QueryRunner;
@@ -280,38 +282,54 @@ public class ServerManager implements QuerySegmentWalker
   {
     SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
     String segmentId = adapter.getIdentifier();
+
+    MetricsEmittingQueryRunner metricsEmittingQueryRunnerInner = new MetricsEmittingQueryRunner<>(
+        emitter,
+        toolChest,
+        new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor),
+        QueryMetrics::reportSegmentTime,
+        queryMetrics -> queryMetrics.segment(segmentId)
+    );
+
+    CachingQueryRunner cachingQueryRunner = new CachingQueryRunner<>(
+        segmentId,
+        segmentDescriptor,
+        objectMapper,
+        cache,
+        toolChest,
+        metricsEmittingQueryRunnerInner,
+        cachingExec,
+        cacheConfig
+    );
+
+    BySegmentQueryRunner bySegmentQueryRunner = new BySegmentQueryRunner<>(
+        segmentId,
+        adapter.getDataInterval().getStart(),
+        cachingQueryRunner
+    );
+
+    MetricsEmittingQueryRunner metricsEmittingQueryRunnerOuter = new MetricsEmittingQueryRunner<>(
+        emitter,
+        toolChest,
+        bySegmentQueryRunner,
+        QueryMetrics::reportSegmentAndCacheTime,
+        queryMetrics -> queryMetrics.segment(segmentId)
+    ).withWaitMeasuredFromNow();
+
+    SpecificSegmentQueryRunner specificSegmentQueryRunner = new SpecificSegmentQueryRunner<>(
+        metricsEmittingQueryRunnerOuter,
+        segmentSpec
+    );
+
+    PerSegmentOptimizingQueryRunner perSegmentOptimizingQueryRunner = new PerSegmentOptimizingQueryRunner<>(
+        specificSegmentQueryRunner,
+        new PerSegmentQueryOptimizationContext(segmentDescriptor)
+    );
+
     return new SetAndVerifyContextQueryRunner<>(
         serverConfig,
         CPUTimeMetricQueryRunner.safeBuild(
-            new SpecificSegmentQueryRunner<>(
-                new MetricsEmittingQueryRunner<>(
-                    emitter,
-                    toolChest,
-                    new BySegmentQueryRunner<>(
-                        segmentId,
-                        adapter.getDataInterval().getStart(),
-                        new CachingQueryRunner<>(
-                            segmentId,
-                            segmentDescriptor,
-                            objectMapper,
-                            cache,
-                            toolChest,
-                            new MetricsEmittingQueryRunner<>(
-                                emitter,
-                                toolChest,
-                                new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor),
-                                QueryMetrics::reportSegmentTime,
-                                queryMetrics -> queryMetrics.segment(segmentId)
-                            ),
-                            cachingExec,
-                            cacheConfig
-                        )
-                    ),
-                    QueryMetrics::reportSegmentAndCacheTime,
-                    queryMetrics -> queryMetrics.segment(segmentId)
-                ).withWaitMeasuredFromNow(),
-                segmentSpec
-            ),
+            perSegmentOptimizingQueryRunner,
             toolChest,
             emitter,
             cpuTimeAccumulator,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org