You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/04/27 00:08:00 UTC
[incubator-druid] branch master updated: Contributing
Moving-Average Query to open source. (#6430)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f02251a Contributing Moving-Average Query to open source. (#6430)
f02251a is described below
commit f02251ab2d6e1b4d13bf78415c9c7528a7a9c063
Author: Eyal Yurman <ey...@gmail.com>
AuthorDate: Fri Apr 26 17:07:48 2019 -0700
Contributing Moving-Average Query to open source. (#6430)
* Contributing Moving-Average Query to open source.
* Fix failing code inspections.
* See if explicit types will invoke the correct comparison function.
* Explicitly remove support for druid.generic.useDefaultValueForNull configuration parameter.
* Update styling and headers for complience.
* Refresh code with latest master changes:
* Remove NullDimensionSelector.
* Apply changes of RequestLogger.
* Apply changes of TimelineServerView.
* Small checkstyle fix.
* Checkstyle fixes.
* Fixing rat errors; Teamcity errors.
* Removing support theta sketches. Will be added back in this pr or a following once DI conflicts with datasketches are resolved.
* Implements some of the review fixes.
* Contributing Moving-Average Query to open source.
* Fix failing code inspections.
* See if explicit types will invoke the correct comparison function.
* Explicitly remove support for druid.generic.useDefaultValueForNull configuration parameter.
* Update styling and headers for complience.
* Refresh code with latest master changes:
* Remove NullDimensionSelector.
* Apply changes of RequestLogger.
* Apply changes of TimelineServerView.
* Small checkstyle fix.
* Checkstyle fixes.
* Fixing rat errors; Teamcity errors.
* Removing support theta sketches. Will be added back in this pr or a following once DI conflicts with datasketches are resolved.
* Implements some of the review fixes.
* More fixes for review.
* More fixes from review.
* MapBasedRow is Unmodifiable. Create new rows instead of modifying existing ones.
* Remove more changes related to datasketches support.
* Refactor BaseAverager startFrom field and add a comment.
* fakeEvents field: Refactor initialization and add comment.
* Rename parameters (tiny change).
* Fix variable name typo in test (JAN_4).
* Fix styling of non camelCase fields.
* Fix Preconditions.checkArgument for cycleSize.
* Add more documentation to RowBucketIterable and other classes.
* key/value comment on in MovingAverageIterable.
* Fix anonymous makeColumnValueSelector returning null.
* Replace IdentityYieldingAccumolator with Yielders.each().
* * internalNext() should return null instead of throwing exception.
* Remove unused variables/prarameters.
* Harden MovingAverageIterableTest (Switch anyOf to exact match).
* Change internalNext() from recursion to iteration; Simplify next() and hasNext().
* Remove unused imports.
* Address review comments.
* Rename fakeEvents to emptyEvents.
* Remove redundant parameter key from computeMovingAverage.
* Check yielder as well in RowBucketIterable#hasNext()
* Fix javadoc.
---
distribution/pom.xml | 3 +-
.../extensions-contrib/moving-average-query.md | 337 +++++++++
docs/content/development/extensions.md | 1 +
extensions-contrib/moving-average-query/README.md | 29 +
extensions-contrib/moving-average-query/pom.xml | 83 +++
.../movingaverage/AveragerFactoryWrapper.java | 179 +++++
.../query/movingaverage/BucketingAccumulator.java | 68 ++
.../DefaultMovingAverageQueryMetrics.java | 62 ++
.../DefaultMovingAverageQueryMetricsFactory.java | 59 ++
.../query/movingaverage/MovingAverageHelper.java | 53 ++
.../query/movingaverage/MovingAverageIterable.java | 312 ++++++++
.../query/movingaverage/MovingAverageQuery.java | 378 ++++++++++
.../movingaverage/MovingAverageQueryMetrics.java | 42 ++
.../MovingAverageQueryMetricsFactory.java | 36 +
.../movingaverage/MovingAverageQueryModule.java | 61 ++
.../movingaverage/MovingAverageQueryRunner.java | 235 ++++++
.../movingaverage/MovingAverageQueryToolChest.java | 135 ++++
.../PostAveragerAggregatorCalculator.java | 63 ++
.../druid/query/movingaverage/RowBucket.java | 62 ++
.../query/movingaverage/RowBucketIterable.java | 155 ++++
.../query/movingaverage/averagers/Averager.java | 57 ++
.../movingaverage/averagers/AveragerFactory.java | 106 +++
.../movingaverage/averagers/BaseAverager.java | 192 +++++
.../averagers/BaseAveragerFactory.java | 103 +++
.../averagers/ComparableAveragerFactory.java | 51 ++
.../movingaverage/averagers/ConstantAverager.java | 81 +++
.../averagers/ConstantAveragerFactory.java | 101 +++
.../movingaverage/averagers/DoubleMaxAverager.java | 44 ++
.../averagers/DoubleMaxAveragerFactory.java | 44 ++
.../averagers/DoubleMeanAverager.java | 48 ++
.../averagers/DoubleMeanAveragerFactory.java | 44 ++
.../averagers/DoubleMeanNoNullAverager.java | 46 ++
.../averagers/DoubleMeanNoNullAveragerFactory.java | 43 ++
.../movingaverage/averagers/DoubleMinAverager.java | 44 ++
.../averagers/DoubleMinAveragerFactory.java | 43 ++
.../movingaverage/averagers/LongMaxAverager.java | 44 ++
.../averagers/LongMaxAveragerFactory.java | 43 ++
.../movingaverage/averagers/LongMeanAverager.java | 48 ++
.../averagers/LongMeanAveragerFactory.java | 44 ++
.../averagers/LongMeanNoNullAverager.java | 46 ++
.../averagers/LongMeanNoNullAveragerFactory.java | 44 ++
.../movingaverage/averagers/LongMinAverager.java | 45 ++
.../averagers/LongMinAveragerFactory.java | 44 ++
.../org.apache.druid.initialization.DruidModule | 16 +
.../movingaverage/MovingAverageIterableTest.java | 803 +++++++++++++++++++++
.../movingaverage/MovingAverageQueryTest.java | 420 +++++++++++
.../PostAveragerAggregatorCalculatorTest.java | 109 +++
.../query/movingaverage/RowBucketIterableTest.java | 670 +++++++++++++++++
.../averagers/BaseAveragerFactoryTest.java | 68 ++
.../movingaverage/averagers/BaseAveragerTest.java | 156 ++++
.../averagers/DoubleMaxAveragerFactoryTest.java | 38 +
.../averagers/DoubleMaxAveragerTest.java | 57 ++
.../averagers/DoubleMeanAveragerFactoryTest.java | 37 +
.../averagers/DoubleMeanAveragerTest.java | 58 ++
.../DoubleMeanAveragerWithPeriodTest.java | 81 +++
.../DoubleMeanNoNullAveragerFactoryTest.java | 37 +
.../averagers/DoubleMeanNoNullAveragerTest.java | 82 +++
.../averagers/DoubleMinAveragerFactoryTest.java | 37 +
.../averagers/DoubleMinAveragerTest.java | 58 ++
.../averagers/LongMaxAveragerFactoryTest.java | 37 +
.../averagers/LongMaxAveragerTest.java | 57 ++
.../averagers/LongMeanAveragerFactoryTest.java | 37 +
.../averagers/LongMeanAveragerTest.java | 57 ++
.../LongMeanNoNullAveragerFactoryTest.java | 37 +
.../averagers/LongMeanNoNullAveragerTest.java | 57 ++
.../averagers/LongMinAveragerFactoryTest.java | 37 +
.../averagers/LongMinAveragerTest.java | 58 ++
.../druid/query/movingaverage/test/TestConfig.java | 35 +
.../queryTests/basicGroupByMovingAverage.yaml | 57 ++
.../queryTests/basicGroupByMovingAverage2.yaml | 57 ++
.../queryTests/basicTimeseriesMovingAverage.yaml | 51 ++
.../resources/queryTests/missingGroupByValues.yaml | 78 ++
.../resources/queryTests/sortingAveragersAsc.yaml | 81 +++
.../resources/queryTests/sortingAveragersDesc.yaml | 82 +++
.../sortingWithNonMovingAndMovingAvgMetric.yaml | 84 +++
.../queryTests/sortingWithNonMovingAvgMetric.yaml | 82 +++
pom.xml | 3 +-
77 files changed, 7470 insertions(+), 2 deletions(-)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 5f87081..a6f22f2 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -333,7 +333,8 @@
<argument>org.apache.druid.extensions.contrib:druid-time-min-max</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-virtual-columns</argument>
- </arguments>
+ <argument>-c</argument>
+ <argument>org.apache.druid.extensions.contrib:druid-moving-average-query</argument> </arguments>
</configuration>
</execution>
</executions>
diff --git a/docs/content/development/extensions-contrib/moving-average-query.md b/docs/content/development/extensions-contrib/moving-average-query.md
new file mode 100644
index 0000000..5fc7268
--- /dev/null
+++ b/docs/content/development/extensions-contrib/moving-average-query.md
@@ -0,0 +1,337 @@
+---
+layout: doc_page
+---
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+# Moving Average Queries
+
+## Overview
+**Moving Average Query** is an extension which provides support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.
+
+These Aggregate Window Functions consume standard Druid Aggregators and outputs additional windowed aggregates called [Averagers](#averagers).
+
+#### High level algorithm
+
+Moving Average encapsulates the [groupBy query](../../querying/groupbyquery.html) (Or [timeseries](../../querying/timeseriesquery.html) in case of no dimensions) in order to rely on the maturity of these query types.
+
+It runs the query in two main phases:
+1. Runs an inner [groupBy](../../querying/groupbyquery.html) or [timeseries](../../querying/timeseriesquery.html) query to compute Aggregators (i.e. daily count of events).
+2. Passes over aggregated results in Broker, in order to compute Averagers (i.e. moving 7 day average of the daily count).
+
+#### Main enhancements provided by this extension:
+1. Functionality: Extending druid query functionality (i.e. initial introduction of Window Functions).
+2. Performance: Improving performance of such moving aggregations by eliminating multiple segment scans.
+
+#### Further reading
+[Moving Average](https://en.wikipedia.org/wiki/Moving_average)
+
+[Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions)
+
+[Analytic Functions](https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts)
+
+
+## Operations
+To use this extension, make sure to [load](../../operations/including-extensions.html) `druid-moving-average-query` only to the Broker.
+
+## Configuration
+There are currently no configuration properties specific to Moving Average.
+
+## Limitations
+* movingAverage is missing support for the following groupBy properties: `subtotalsSpec`, `virtualColumns`.
+* movingAverage is missing support for the following timeseries properties: `descending`.
+* movingAverage is missing support for [SQL-compatible null handling](https://github.com/apache/incubator-druid/issues/4349) (So setting druid.generic.useDefaultValueForNull in configuration will give an error).
+
+##Query spec:
+* Most properties in the query spec derived from [groupBy query](../../querying/groupbyquery.html) / [timeseries](../../querying/timeseriesquery.html), see documentation for these query types.
+
+|property|description|required?|
+|--------|-----------|---------|
+|queryType|This String should always be "movingAverage"; this is the first thing Druid looks at to figure out how to interpret the query.|yes|
+|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../../querying/datasource.html) for more information.|yes|
+|dimensions|A JSON list of [DimensionSpec](../../querying/dimensionspecs.html) (Notice that property is optional)|no|
+|limitSpec|See [LimitSpec](../../querying/limitspec.html)|no|
+|having|See [Having](../../querying/having.html)|no|
+|granularity|A period granilarity; See [Period Granularities](../../querying/granularities.html#period-granularities)|yes|
+|filter|See [Filters](../../querying/filters.html)|no|
+|aggregations|Aggregations forms the input to Averagers; See [Aggregations](../../querying/aggregations.html)|yes|
+|postAggregations|Supports only aggregations as input; See [Post Aggregations](../../querying/post-aggregations.html)|no|
+|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
+|context|An additional JSON Object which can be used to specify certain flags.|no|
+|averagers|Defines the moving average function; See [Averagers](#averagers)|yes|
+|postAveragers|Support input of both averagers and aggregations; Syntax is identical to postAggregations (See [Post Aggregations](../../querying/post-aggregations.html))|no|
+
+## Averagers
+
+Averagers are used to define the Moving-Average function. Averagers are not limited to an average - they can also provide other types of window functions such as MAX()/MIN().
+
+### Properties
+
+These are properties which are common to all Averagers:
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|Averager type; See [Averager types](#averager-types)|yes|
+|name|Averager name|yes|
+|fieldName|Input name (An aggregation name)|yes|
+|buckets|Number of lookback buckets (time periods), including current one. Must be >0|yes|
+|cycleSize|Cycle size; Used to calculate day-of-week option; See [Cycle size (Day of Week)](#cycle-size-day-of-week)|no, defaults to 1|
+
+
+### Averager types:
+
+* [Standard averagers](#standard-averagers):
+ * doubleMean
+ * doubleMeanNoNulls
+ * doubleMax
+ * doubleMin
+ * longMean
+ * longMeanNoNulls
+ * longMax
+ * longMin
+
+#### Standard averagers
+
+These averagers offer four functions:
+* Mean (Average)
+* MeanNoNulls (Ignores empty buckets).
+* Max
+* Min
+
+**Ignoring nulls**:
+Using a MeanNoNulls averager is useful when the interval starts at the dataset beginning time.
+In that case, the first records will ignore missing buckets and average won't be artificially low.
+However, this also means that empty days in a sparse dataset will also be ignored.
+
+Example of usage:
+```json
+{ "type" : "doubleMean", "name" : <output_name>, "fieldName": <input_name> }
+```
+
+### Cycle size (Day of Week)
+This optional parameter is used to calculate over a single bucket within each cycle instead of all buckets.
+A prime example would be weekly buckets, resulting in a Day of Week calculation. (Other examples: Month of year, Hour of day).
+
+I.e. when using these parameters:
+* *granularity*: period=P1D (daily)
+* *buckets*: 28
+* *cycleSize*: 7
+
+Within each output record, the averager will compute the result over the following buckets: current (#0), #7, #14, #21.
+Whereas without specifying cycleSize it would have computed over all 28 buckets.
+
+## Examples
+
+All examples are based on the Wikipedia dataset provided in the Druid [tutorials](../../tutorials/index.html).
+
+### Basic example
+
+Calculating a 7-buckets moving average for Wikipedia edit deltas.
+
+Query syntax:
+```json
+{
+ "queryType": "movingAverage",
+ "dataSource": "wikipedia",
+ "granularity": {
+ "type": "period",
+ "period": "PT30M"
+ },
+ "intervals": [
+ "2015-09-12T00:00:00Z/2015-09-13T00:00:00Z"
+ ],
+ "aggregations": [
+ {
+ "name": "delta30Min",
+ "fieldName": "delta",
+ "type": "longSum"
+ }
+ ],
+ "averagers": [
+ {
+ "name": "trailing30MinChanges",
+ "fieldName": "delta30Min",
+ "type": "longMean",
+ "buckets": 7
+ }
+ ]
+}
+```
+
+Result:
+```json
+[ {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T00:30:00.000Z",
+ "event" : {
+ "delta30Min" : 30490,
+ "trailing30MinChanges" : 4355.714285714285
+ }
+ }, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T01:00:00.000Z",
+ "event" : {
+ "delta30Min" : 96526,
+ "trailing30MinChanges" : 18145.14285714286
+ }
+ }, {
+...
+...
+...
+}, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T23:00:00.000Z",
+ "event" : {
+ "delta30Min" : 119100,
+ "trailing30MinChanges" : 198697.2857142857
+ }
+}, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T23:30:00.000Z",
+ "event" : {
+ "delta30Min" : 177882,
+ "trailing30MinChanges" : 193890.0
+ }
+}
+```
+
+### Post averager example
+
+Calculating a 7-buckets moving average for Wikipedia edit deltas, plus a ratio between the current period and the moving average.
+
+Query syntax:
+```json
+{
+ "queryType": "movingAverage",
+ "dataSource": "wikipedia",
+ "granularity": {
+ "type": "period",
+ "period": "PT30M"
+ },
+ "intervals": [
+ "2015-09-12T22:00:00Z/2015-09-13T00:00:00Z"
+ ],
+ "aggregations": [
+ {
+ "name": "delta30Min",
+ "fieldName": "delta",
+ "type": "longSum"
+ }
+ ],
+ "averagers": [
+ {
+ "name": "trailing30MinChanges",
+ "fieldName": "delta30Min",
+ "type": "longMean",
+ "buckets": 7
+ }
+ ],
+ "postAveragers" : [
+ {
+ "name": "ratioTrailing30MinChanges",
+ "type": "arithmetic",
+ "fn": "/",
+ "fields": [
+ {
+ "type": "fieldAccess",
+ "fieldName": "delta30Min"
+ },
+ {
+ "type": "fieldAccess",
+ "fieldName": "trailing30MinChanges"
+ }
+ ]
+ }
+ ]
+}
+```
+
+Result:
+```json
+[ {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T22:00:00.000Z",
+ "event" : {
+ "delta30Min" : 144269,
+ "trailing30MinChanges" : 204088.14285714287,
+ "ratioTrailing30MinChanges" : 0.7068955500319539
+ }
+}, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T22:30:00.000Z",
+ "event" : {
+ "delta30Min" : 242860,
+ "trailing30MinChanges" : 214031.57142857142,
+ "ratioTrailing30MinChanges" : 1.134692411867141
+ }
+}, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T23:00:00.000Z",
+ "event" : {
+ "delta30Min" : 119100,
+ "trailing30MinChanges" : 198697.2857142857,
+ "ratioTrailing30MinChanges" : 0.5994042624782422
+ }
+}, {
+ "version" : "v1",
+ "timestamp" : "2015-09-12T23:30:00.000Z",
+ "event" : {
+ "delta30Min" : 177882,
+ "trailing30MinChanges" : 193890.0,
+ "ratioTrailing30MinChanges" : 0.9174377224199288
+ }
+} ]
+```
+
+
+### Cycle size example
+
+Calculating an average of every first 10-minutes of the last 3 hours:
+
+Query syntax:
+```json
+{
+ "queryType": "movingAverage",
+ "dataSource": "wikipedia",
+ "granularity": {
+ "type": "period",
+ "period": "PT10M"
+ },
+ "intervals": [
+ "2015-09-12T00:00:00Z/2015-09-13T00:00:00Z"
+ ],
+ "aggregations": [
+ {
+ "name": "delta10Min",
+ "fieldName": "delta",
+ "type": "doubleSum"
+ }
+ ],
+ "averagers": [
+ {
+ "name": "trailing10MinPerHourChanges",
+ "fieldName": "delta10Min",
+ "type": "doubleMeanNoNulls",
+ "buckets": 18,
+ "cycleSize": 6
+ }
+ ]
+}
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 5435705..c5f67bb 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -95,6 +95,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)|
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)|
+|druid-moving-average-query|Support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.|[link](../development/extensions-contrib/moving-average-query.html)|
## Promoting Community Extension to Core Extension
diff --git a/extensions-contrib/moving-average-query/README.md b/extensions-contrib/moving-average-query/README.md
new file mode 100644
index 0000000..33156e7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/README.md
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+druid-moving-average-query
+=============
+
+Overview
+=============
+**Moving Average Query** is an extension which provides support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.
+
+Documentation
+=============
+See the druid.io website or under [Druid Github Repo](https://github.com/apache/incubator-druid/tree/master/docs/content/development/extensions-contrib/moving-average-query.md).
diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml
new file mode 100644
index 0000000..ae6f68a
--- /dev/null
+++ b/extensions-contrib/moving-average-query/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+
+ <parent>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid</artifactId>
+ <version>0.15.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.druid.extensions.contrib</groupId>
+ <artifactId>druid-moving-average-query</artifactId>
+ <name>druid-moving-average-query</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jmockit</groupId>
+ <artifactId>jmockit</artifactId>
+ <version>1.25</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <version>2.8.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/AveragerFactoryWrapper.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/AveragerFactoryWrapper.java
new file mode 100644
index 0000000..f6f1d90
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/AveragerFactoryWrapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A wrapper around averagers that makes them appear to be aggregators.
+ * This is necessary purely to allow existing common druid code that only knows
+ * about aggregators to work with the MovingAverageQuery query as well.
+ *
+ * NOTE: The {@link AggregatorFactory} abstract class is only partially extended.
+ * Most methods are not implemented and throw {@link UnsupportedOperationException} if called.
+ * This is becsuse these methods are invalid for the AveragerFactoryWrapper.
+ *
+ * @param <T> Result type
+ * @param <R> Finalized Result type
+ */
+public class AveragerFactoryWrapper<T, R> extends AggregatorFactory
+{
+
+ private final AveragerFactory<T, R> af;
+ private final String prefix;
+
+ /**
+ * Simple constructor
+ *
+ * @param af
+ * @param prefix
+ */
+ public AveragerFactoryWrapper(AveragerFactory<T, R> af, String prefix)
+ {
+ this.af = af;
+ this.prefix = prefix;
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory metricFactory) throws UnsupportedOperationException
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.druid.query.aggregation.AggregatorFactory#getComparator()
+ */
+ @Override
+ public Comparator<?> getComparator()
+ {
+ return af.getComparator();
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public Object combine(Object lhs, Object rhs)
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public List<AggregatorFactory> getRequiredColumns()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public Object deserialize(Object object)
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object finalizeComputation(Object object)
+ {
+ return af.finalizeComputation((T) object);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.druid.query.aggregation.AggregatorFactory#getName()
+ */
+ @Override
+ public String getName()
+ {
+ return prefix + af.getName();
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public List<String> requiredFields()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public byte[] getCacheKey()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public String getTypeName()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+ /**
+ * Not implemented. Throws UnsupportedOperationException.
+ */
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ throw new UnsupportedOperationException("Invalid operation for AveragerFactoryWrapper.");
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/BucketingAccumulator.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/BucketingAccumulator.java
new file mode 100644
index 0000000..a79e24b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/BucketingAccumulator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Groups all the rows for a specific period together.
+ * Rows of each period are placed in a single {@link RowBucket} (timed through the dateTime field).
+ * (Assumpltion: Input arrives sorted by timestamp).
+ */
+public class BucketingAccumulator extends YieldingAccumulator<RowBucket, Row>
+{
+
+ /* (non-Javadoc)
+ * @see YieldingAccumulator#accumulate(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public RowBucket accumulate(RowBucket accumulated, Row in)
+ {
+ List<Row> rows;
+
+ if (accumulated == null) {
+ // first row, initializing
+ rows = new ArrayList<>();
+ accumulated = new RowBucket(in.getTimestamp(), rows);
+ } else if (accumulated.getNextBucket() != null) {
+ accumulated = accumulated.getNextBucket();
+ }
+
+ if (!accumulated.getDateTime().equals(in.getTimestamp())) {
+ // day change detected
+ rows = new ArrayList<>();
+ rows.add(in);
+ RowBucket nextBucket = new RowBucket(in.getTimestamp(), rows);
+ accumulated.setNextBucket(nextBucket);
+ yield();
+ } else {
+ // still on the same day
+ rows = accumulated.getRows();
+ rows.add(in);
+ }
+
+ return accumulated;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetrics.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetrics.java
new file mode 100644
index 0000000..8dced39
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetrics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.query.DefaultQueryMetrics;
+import org.apache.druid.query.DruidMetrics;
+
+public class DefaultMovingAverageQueryMetrics extends DefaultQueryMetrics<MovingAverageQuery> implements
+ MovingAverageQueryMetrics
+{
+
+ public DefaultMovingAverageQueryMetrics(ObjectMapper jsonMapper)
+ {
+ super(jsonMapper);
+ }
+
+ @Override
+ public void query(MovingAverageQuery query)
+ {
+ super.query(query);
+ numDimensions(query);
+ numMetrics(query);
+ numComplexMetrics(query);
+ }
+
+ @Override
+ public void numDimensions(MovingAverageQuery query)
+ {
+ setDimension("numDimensions", String.valueOf(query.getDimensions().size()));
+ }
+
+ @Override
+ public void numMetrics(MovingAverageQuery query)
+ {
+ setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
+ }
+
+ @Override
+ public void numComplexMetrics(MovingAverageQuery query)
+ {
+ int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
+ setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetricsFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetricsFactory.java
new file mode 100644
index 0000000..c9efb8b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/DefaultMovingAverageQueryMetricsFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.jackson.DefaultObjectMapper;
+
+@LazySingleton
+public class DefaultMovingAverageQueryMetricsFactory implements MovingAverageQueryMetricsFactory
+{
+
+ private static final MovingAverageQueryMetricsFactory INSTANCE =
+ new DefaultMovingAverageQueryMetricsFactory(new DefaultObjectMapper());
+
+ /**
+ * Should be used only in tests, directly or indirectly (via {@link
+ * MovingAverageQueryToolChest#MovingAverageQueryToolChest}).
+ */
+ @VisibleForTesting
+ public static MovingAverageQueryMetricsFactory instance()
+ {
+ return INSTANCE;
+ }
+
+ private final ObjectMapper jsonMapper;
+
+ @Inject
+ public DefaultMovingAverageQueryMetricsFactory(@Json ObjectMapper jsonMapper)
+ {
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public MovingAverageQueryMetrics makeMetrics()
+ {
+ return new DefaultMovingAverageQueryMetrics(jsonMapper);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageHelper.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageHelper.java
new file mode 100644
index 0000000..f0cfb0b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageHelper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.query.dimension.DimensionSpec;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MovingAverageHelper
+{
+
+ /**
+ * @param dimensions A list of DimensionSpec in the specified in the query
+ * @param row The Row to be used for looking up dimension values
+ *
+ * @return A Map of dimension/value from the row
+ */
+
+ public static Map<String, Object> getDimKeyFromRow(Collection<DimensionSpec> dimensions, Row row)
+ {
+
+ Map<String, Object> key = new HashMap<>();
+ Map<String, Object> event = ((MapBasedRow) row).getEvent();
+
+ for (DimensionSpec dimension : dimensions) {
+ key.put(dimension.getOutputName(), event.get(dimension.getOutputName()));
+ }
+
+ return key;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
new file mode 100644
index 0000000..b92604d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See computeMovingAverage for more details.
+ */
+public class MovingAverageIterable implements Iterable<Row>
+{
+
+ private final Sequence<RowBucket> seq;
+ private final List<DimensionSpec> dims;
+ private final List<AveragerFactory<?, ?>> factories;
+ private final Map<String, PostAggregator> postAggMap;
+ private final Map<String, AggregatorFactory> aggMap;
+ private final Map<String, Object> emptyEvents;
+
+ public MovingAverageIterable(
+ Sequence<RowBucket> buckets,
+ List<DimensionSpec> dims,
+ List<AveragerFactory<?, ?>> factories,
+ List<PostAggregator> postAggList,
+ List<AggregatorFactory> aggList
+ )
+ {
+ this.dims = dims;
+ this.factories = factories;
+ this.seq = buckets;
+
+ postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> postAgg.getName(), postAgg -> postAgg));
+ aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), agg -> agg));
+ emptyEvents = generateEmptyEventsFromAggregators(aggMap, postAggMap);
+ }
+
+ // Build a list of empty events from Aggregators/PostAggregators to be used by Iterator to build fake rows.
+ // These fake rows will be used by computeMovingAverage() in skip=true mode.
+ // See emptyEventsCopy in internalNext() and computeMovingAverage() documentation.
+ private Map<String, Object> generateEmptyEventsFromAggregators(Map<String, AggregatorFactory> aggMap,
+ Map<String, PostAggregator> postAggMap)
+ {
+ Map<String, Object> emptyEvents = new LinkedHashMap<>();
+ aggMap.values().forEach(agg -> {
+ Aggregator aggFactorized = agg.factorize(getEmptyColumnSelectorFactory());
+ emptyEvents.put(agg.getName(), aggFactorized.get());
+ });
+ postAggMap.values().forEach(postAgg -> emptyEvents.put(postAgg.getName(), postAgg.compute(emptyEvents)));
+ return emptyEvents;
+ }
+
+ @Nonnull
+ private ColumnSelectorFactory getEmptyColumnSelectorFactory()
+ {
+ return new ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ // Generating empty records while aggregating on Filtered aggregators requires a dimension selector
+ // for initialization. This dimension selector is not actually used for generating values
+ return DimensionSelector.constant(null);
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String s)
+ {
+ return NilColumnValueSelector.instance();
+ }
+
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String s)
+ {
+ return null;
+ }
+ };
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<Row> iterator()
+ {
+ return new MovingAverageIterator(seq, dims, factories, emptyEvents, aggMap);
+ }
+
+ static class MovingAverageIterator implements Iterator<Row>
+ {
+
+ private final List<DimensionSpec> dims;
+ // Key: Row's dimension set. Value: Averager. See MovingAverageIterator#computeMovingAverage for more details.
+ private final Map<Map<String, Object>, List<Averager<?>>> averagers = new HashMap<>();
+ private final List<AveragerFactory<?, ?>> averagerFactories;
+
+ private Yielder<RowBucket> yielder;
+ private RowBucket cache = null;
+ private Iterator<Row> cacheIter;
+ private Iterator<Map<String, Object>> averagersKeysIter;
+ private Set<Map<String, Object>> seenKeys = new HashSet<>();
+ private Row saveNext;
+ private Map<String, AggregatorFactory> aggMap;
+ private Map<String, Object> emptyEvents;
+
+ public MovingAverageIterator(
+ Sequence<RowBucket> rows,
+ List<DimensionSpec> dims,
+ List<AveragerFactory<?, ?>> averagerFactories,
+ Map<String, Object> emptyEvents,
+ Map<String, AggregatorFactory> aggMap
+ )
+ {
+ this.dims = dims;
+ this.averagerFactories = averagerFactories;
+ this.emptyEvents = emptyEvents;
+ this.aggMap = aggMap;
+
+ yielder = Yielders.each(rows);
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext()
+ {
+ if (saveNext != null) {
+ return true;
+ }
+
+ saveNext = internalNext();
+ return (saveNext != null);
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public Row next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Row retVal = saveNext;
+ saveNext = null;
+ return retVal;
+ }
+
+ private Row internalNext()
+ {
+ // Iterate until there is a row to return or Yielder is exahusted, in such a case return null.
+ // This is used in order to skip empty buckets (iterate to the next one).
+ while (true) {
+ if (cache == null && !yielder.isDone()) {
+ cache = yielder.get();
+ yielder = yielder.next(cache);
+
+ cacheIter = cache.getRows().iterator();
+ }
+
+ Row r;
+
+ // return rows from the cached RowBucket
+ if (cacheIter != null) {
+ if (cacheIter.hasNext()) {
+ r = cacheIter.next();
+ // Convert full event (key + metrics) to key
+ Map<String, Object> key = MovingAverageHelper.getDimKeyFromRow(dims, r);
+ seenKeys.add(key);
+ r = computeMovingAverage((MapBasedRow) r, false);
+ if (r != null) {
+ return r;
+ } else {
+ throw new NoSuchElementException();
+ }
+ } else {
+ Set<Map<String, Object>> averagerKeys = new HashSet<>(averagers.keySet());
+ averagerKeys.removeAll(seenKeys);
+ averagersKeysIter = averagerKeys.iterator();
+ cacheIter = null;
+ }
+ }
+
+ // return empty rows for unseen dimension combinations
+ if (averagersKeysIter != null) {
+ while (averagersKeysIter.hasNext()) {
+ Map<String, Object> dims = averagersKeysIter.next();
+ Map<String, Object> emptyEventsCopy = new HashMap<>(emptyEvents);
+
+ // Convert key to a full dummy event (key + dummy metrics).
+ dims.forEach((dim, value) -> emptyEventsCopy.put(dim, value));
+
+ r = computeMovingAverage(new MapBasedRow(cache.getDateTime(), emptyEventsCopy), true);
+ if (r != null) {
+ return r;
+ }
+ }
+
+ seenKeys.clear();
+ averagersKeysIter = null;
+ cache = null;
+ }
+
+ if (cacheIter == null && yielder.isDone()) {
+ // we should never get here. For some reason, there is
+ // no more work to do, so continuing to iterate will infinite loop
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Compute and add any moving average columns.
+ *
+ * <p>Normally, the row passed in will be added to all the {@link Averager}'s and then results pulled
+ * from each averager. If skip is true, then the incoming row is actually a dummy value due to
+ * no data being present for this dimension combination in the current bucket. When this happens,
+ * {@link Averager#skip()} should be called instead of {@link Averager#addElement(Map, Map)}()} to force proper
+ * decaying of the average values.
+ *
+ * <p>Usually, the contents of key will be contained by the row R being passed in, but in the case of a
+ * dummy row, it's possible that the dimensions will be known but the row empty. Hence, the values are
+ * passed as two separate arguments.
+ *
+ * @param r The Row to operate on
+ * @param skip Indicates whether skip or add should be called
+ *
+ * @return The updated row containing averager results, or null if no averagers computed a result
+ */
+ @Nullable
+ private Row computeMovingAverage(MapBasedRow r, boolean skip)
+ {
+ Map<String, Object> event = r.getEvent();
+ Map<String, Object> result = new HashMap<>(event);
+ Map<String, Object> key = MovingAverageHelper.getDimKeyFromRow(dims, r);
+
+ List<Averager<?>> avg = averagers.get(key);
+
+ // Initialize key's averagers.
+ if (avg == null) {
+ avg = averagerFactories.stream().map(af -> af.createAverager()).collect(Collectors.toList());
+ averagers.put(key, avg);
+ }
+
+ if (!skip) {
+ avg.forEach(af -> af.addElement(event, aggMap));
+ } else {
+ avg.forEach(af -> af.skip());
+ }
+
+ avg.forEach(af -> result.put(af.getName(), af.getResult()));
+
+ // At least one non-dimension value must be in the record for it to be valid.
+ if (result.entrySet().stream().anyMatch(e -> !key.containsKey(e.getKey()) && e.getValue() != null)) {
+ result.putAll(event);
+ return new MapBasedRow(r.getTimestamp(), result);
+ } else {
+ // No averagers returned anything. All buckets must be empty.
+ // skip this row.
+ return null;
+ }
+ }
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java
new file mode 100644
index 0000000..38fc1eb
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQuery.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.groupby.having.HavingSpec;
+import org.apache.druid.query.groupby.orderby.LimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class that defines druid MovingAverage query fields
+ */
+@JsonTypeName("movingAverage")
+public class MovingAverageQuery extends BaseQuery<Row>
+{
+
+ public static final String MOVING_AVG_QUERY_TYPE = "movingAverage";
+ public static final String CTX_KEY_SORT_BY_DIMS_FIRST = "sortByDimsFirst";
+
+ private final LimitSpec limitSpec;
+ private final HavingSpec havingSpec;
+ private final DimFilter dimFilter;
+ private final Function<Sequence<Row>, Sequence<Row>> limitFn;
+ private final Granularity granularity;
+ private final List<DimensionSpec> dimensions;
+ private final List<AggregatorFactory> aggregatorSpecs;
+ private final List<PostAggregator> postAggregatorSpecs;
+ private final List<AveragerFactory<?, ?>> averagerSpecs;
+ private final List<PostAggregator> postAveragerSpecs;
+
+ @JsonCreator
+ public MovingAverageQuery(
+ @JsonProperty("dataSource") DataSource dataSource,
+ @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
+ @JsonProperty("filter") DimFilter dimFilter,
+ @JsonProperty("granularity") Granularity granularity,
+ @JsonProperty("dimensions") List<DimensionSpec> dimensions,
+ @JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
+ @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
+ @JsonProperty("having") HavingSpec havingSpec,
+ @JsonProperty("averagers") List<AveragerFactory<?, ?>> averagerSpecs,
+ @JsonProperty("postAveragers") List<PostAggregator> postAveragerSpecs,
+ @JsonProperty("limitSpec") LimitSpec limitSpec,
+ @JsonProperty("context") Map<String, Object> context
+ )
+ {
+ super(dataSource, querySegmentSpec, false, context);
+
+ //TBD: Implement null awareness to respect the contract of this flag.
+ Preconditions.checkArgument(NullHandling.replaceWithDefault(), "movingAverage does not support druid.generic.useDefaultValueForNull=false");
+
+ this.dimFilter = dimFilter;
+ this.granularity = granularity;
+ this.dimensions = dimensions == null ? ImmutableList.of() : dimensions;
+ for (DimensionSpec spec : this.dimensions) {
+ Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec");
+ }
+ this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs;
+ this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs;
+ this.averagerSpecs = averagerSpecs == null ? ImmutableList.of() : averagerSpecs;
+ this.postAveragerSpecs = postAveragerSpecs == null ? ImmutableList.of() : postAveragerSpecs;
+ this.havingSpec = havingSpec;
+ this.limitSpec = (limitSpec == null) ? NoopLimitSpec.INSTANCE : limitSpec;
+
+ Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
+
+ verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
+
+ // build combined list of aggregators and averagers so that limit spec building is happy
+ List<AggregatorFactory> combinedAggregatorSpecs = new ArrayList<>();
+ combinedAggregatorSpecs.addAll(this.aggregatorSpecs);
+ for (AveragerFactory<?, ?> avg : this.averagerSpecs) {
+ combinedAggregatorSpecs.add(new AveragerFactoryWrapper(avg, ""));
+ }
+
+ Function<Sequence<Row>, Sequence<Row>> postProcFn =
+ this.limitSpec.build(
+ this.dimensions,
+ combinedAggregatorSpecs,
+ this.postAggregatorSpecs,
+ this.granularity,
+ getContextSortByDimsFirst()
+ );
+
+ if (havingSpec != null) {
+ postProcFn = Functions.compose(
+ postProcFn,
+ new Function<Sequence<Row>, Sequence<Row>>()
+ {
+ @Override
+ public Sequence<Row> apply(Sequence<Row> input)
+ {
+ return Sequences.filter(
+ input,
+ new Predicate<Row>()
+ {
+ @Override
+ public boolean apply(Row input)
+ {
+ return MovingAverageQuery.this.havingSpec.eval(input);
+ }
+ }
+ );
+ }
+ }
+ );
+ }
+
+ this.limitFn = postProcFn;
+
+ }
+
+ private static void verifyOutputNames(
+ List<DimensionSpec> dimensions,
+ List<AggregatorFactory> aggregators,
+ List<PostAggregator> postAggregators
+ )
+ {
+
+ final Set<String> outputNames = new HashSet<>();
+ for (DimensionSpec dimension : dimensions) {
+ if (!outputNames.add(dimension.getOutputName())) {
+ throw new IAE("Duplicate output name[%s]", dimension.getOutputName());
+ }
+ }
+
+ for (AggregatorFactory aggregator : aggregators) {
+ if (!outputNames.add(aggregator.getName())) {
+ throw new IAE("Duplicate output name[%s]", aggregator.getName());
+ }
+ }
+
+ for (PostAggregator postAggregator : postAggregators) {
+ if (!outputNames.add(postAggregator.getName())) {
+ throw new IAE("Duplicate output name[%s]", postAggregator.getName());
+ }
+ }
+ }
+
+ /**
+ * A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks
+ * have already passed in order for the object to exist.
+ */
+ private MovingAverageQuery(
+ DataSource dataSource,
+ QuerySegmentSpec querySegmentSpec,
+ DimFilter dimFilter,
+ Granularity granularity,
+ List<DimensionSpec> dimensions,
+ List<AggregatorFactory> aggregatorSpecs,
+ List<AveragerFactory<?, ?>> averagerSpecs,
+ List<PostAggregator> postAggregatorSpecs,
+ List<PostAggregator> postAveragerSpecs,
+ HavingSpec havingSpec,
+ LimitSpec orderBySpec,
+ Function<Sequence<Row>, Sequence<Row>> limitFn,
+ Map<String, Object> context
+ )
+ {
+ super(dataSource, querySegmentSpec, false, context);
+
+ this.dimFilter = dimFilter;
+ this.granularity = granularity;
+ this.dimensions = dimensions;
+ this.aggregatorSpecs = aggregatorSpecs;
+ this.averagerSpecs = averagerSpecs;
+ this.postAggregatorSpecs = postAggregatorSpecs;
+ this.postAveragerSpecs = postAveragerSpecs;
+ this.havingSpec = havingSpec;
+ this.limitSpec = orderBySpec;
+ this.limitFn = limitFn;
+ }
+
+ @Override
+ public boolean hasFilters()
+ {
+ return dimFilter != null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return MOVING_AVG_QUERY_TYPE;
+ }
+
+ @JsonIgnore
+ public boolean getContextSortByDimsFirst()
+ {
+ return getContextBoolean(CTX_KEY_SORT_BY_DIMS_FIRST, false);
+ }
+
+ @Override
+ @JsonProperty
+ public DimFilter getFilter()
+ {
+ return dimFilter;
+ }
+
+ @Override
+ @JsonProperty
+ public Granularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @JsonProperty
+ public List<DimensionSpec> getDimensions()
+ {
+ return dimensions;
+ }
+
+ @JsonProperty("aggregations")
+ public List<AggregatorFactory> getAggregatorSpecs()
+ {
+ return aggregatorSpecs;
+ }
+
+ @JsonProperty("averagers")
+ public List<AveragerFactory<?, ?>> getAveragerSpecs()
+ {
+ return averagerSpecs;
+ }
+
+ @JsonProperty("postAggregations")
+ public List<PostAggregator> getPostAggregatorSpecs()
+ {
+ return postAggregatorSpecs;
+ }
+
+ @JsonProperty("postAveragers")
+ public List<PostAggregator> getPostAveragerSpecs()
+ {
+ return postAveragerSpecs;
+ }
+
+ @JsonProperty("having")
+ public HavingSpec getHavingSpec()
+ {
+ return havingSpec;
+ }
+
+ @JsonProperty
+ public LimitSpec getLimitSpec()
+ {
+ return limitSpec;
+ }
+
+ @Override
+ public MovingAverageQuery withOverriddenContext(Map contextOverride)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ computeOverridenContext(contextOverride)
+ );
+ }
+
+ @Override
+ public MovingAverageQuery withQuerySegmentSpec(QuerySegmentSpec spec)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ spec,
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ @Override
+ public Query<Row> withDataSource(DataSource dataSource)
+ {
+ return new MovingAverageQuery(
+ dataSource,
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ public Query<Row> withPostAveragers(List<PostAggregator> postAveragerSpecs)
+ {
+ return new MovingAverageQuery(
+ getDataSource(),
+ getQuerySegmentSpec(),
+ dimFilter,
+ granularity,
+ dimensions,
+ aggregatorSpecs,
+ averagerSpecs,
+ postAggregatorSpecs,
+ postAveragerSpecs,
+ havingSpec,
+ limitSpec,
+ limitFn,
+ getContext()
+ );
+ }
+
+ public Sequence<Row> applyLimit(Sequence<Row> results)
+ {
+ return limitFn.apply(results);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java
new file mode 100644
index 0000000..6b9f39a
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.query.QueryMetrics;
+
+public interface MovingAverageQueryMetrics extends QueryMetrics<MovingAverageQuery>
+{
+ /**
+ * Sets the size of {@link MovingAverageQuery#getDimensions()} of the given query as dimension.
+ */
+ void numDimensions(MovingAverageQuery query);
+
+ /**
+ * Sets the number of metrics of the given groupBy query as dimension.
+ */
+ void numMetrics(MovingAverageQuery query);
+
+ /**
+ * Sets the number of "complex" metrics of the given groupBy query as dimension. By default it is assumed that
+ * "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this
+ * method.
+ */
+ void numComplexMetrics(MovingAverageQuery query);
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java
new file mode 100644
index 0000000..db344a0
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryMetricsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+/**
+ * Implementations could be injected using
+ * <p>
+ * PolyBind
+ * .optionBinder(binder, Key.get(MovingAverageQueryMetricsFactory.class))
+ * .addBinding("myCustomMovingAverageQueryMetricsFactory")
+ * .to(MyCustomMovingAverageQueryMetricsFactory.class);
+ * <p>
+ * And then setting property:
+ * druid.query.movingavgquery.queryMetricsFactory=myCustomMovingAverageQueryMetricsFactory
+ */
+public interface MovingAverageQueryMetricsFactory
+{
+ MovingAverageQueryMetrics makeMetrics();
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java
new file mode 100644
index 0000000..9655678
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryModule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import com.google.inject.multibindings.MapBinder;
+import org.apache.druid.guice.DruidBinders;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryToolChest;
+
+import java.util.Collections;
+import java.util.List;
+
+public class MovingAverageQueryModule implements DruidModule
+{
+
+ @Override
+ public void configure(Binder binder)
+ {
+ MapBinder<Class<? extends Query>, QueryToolChest> toolChests = DruidBinders.queryToolChestBinder(binder);
+
+ //Bind the query toolchest to the query class and add the binding to toolchest
+ toolChests.addBinding(MovingAverageQuery.class).to(MovingAverageQueryToolChest.class);
+
+ //Bind the query toolchest to binder
+ binder.bind(MovingAverageQueryToolChest.class).in(LazySingleton.class);
+ }
+
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return Collections.<Module>singletonList(new SimpleModule("MovingAverageQueryModule")
+ .registerSubtypes(new NamedType(
+ MovingAverageQuery.class,
+ "movingAverage"
+ )));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java
new file mode 100644
index 0000000..8834d0d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryRunner.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * The QueryRunner for MovingAverage query.
+ * High level flow:
+ * 1. Invokes an inner groupBy query (Or timeseries for no dimensions scenario) to get Aggregations/PostAggregtions.
+ * 2. Result is passed to {@link RowBucketIterable}, which groups rows of all dimension combinations into period-based (e.g. daily) buckets of rows ({@link RowBucket}).
+ * 3. The sequence is passed to {@link MovingAverageIterable}, which performs the main part of the query of adding Averagers computation into the records.
+ * 4. Finishes up by applying post averagers, removing redundant dates, and applying post phases (having, sorting, limits).
+ */
+public class MovingAverageQueryRunner implements QueryRunner<Row>
+{
+
+ public static final String QUERY_FAIL_TIME = "queryFailTime";
+ public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";
+
+ private final QuerySegmentWalker walker;
+ private final RequestLogger requestLogger;
+
+ public MovingAverageQueryRunner(
+ @Nullable QuerySegmentWalker walker,
+ RequestLogger requestLogger
+ )
+ {
+ this.walker = walker;
+ this.requestLogger = requestLogger;
+ }
+
+ @Override
+ public Sequence<Row> run(QueryPlus<Row> query, Map<String, Object> responseContext)
+ {
+
+ MovingAverageQuery maq = (MovingAverageQuery) query.getQuery();
+ List<Interval> intervals;
+ final Period period;
+
+ // Get the largest bucket from the list of averagers
+ Optional<Integer> opt =
+ maq.getAveragerSpecs().stream().map(AveragerFactory::getNumBuckets).max(Integer::compare);
+ int buckets = opt.orElse(0);
+
+ //Extend the interval beginning by specified bucket - 1
+ if (maq.getGranularity() instanceof PeriodGranularity) {
+ period = ((PeriodGranularity) maq.getGranularity()).getPeriod();
+ int offset = buckets <= 0 ? 0 : (1 - buckets);
+ intervals = maq.getIntervals()
+ .stream()
+ .map(i -> new Interval(i.getStart().withPeriodAdded(period, offset), i.getEnd()))
+ .collect(Collectors.toList());
+ } else {
+ throw new ISE("Only PeriodGranulaity is supported for movingAverage queries");
+ }
+
+ Sequence<Row> resultsSeq;
+ DataSource dataSource = maq.getDataSource();
+ if (maq.getDimensions() != null && !maq.getDimensions().isEmpty() &&
+ (dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource ||
+ dataSource instanceof QueryDataSource)) {
+ // build groupBy query from movingAverage query
+ GroupByQuery.Builder builder = GroupByQuery.builder()
+ .setDataSource(dataSource)
+ .setInterval(intervals)
+ .setDimFilter(maq.getFilter())
+ .setGranularity(maq.getGranularity())
+ .setDimensions(maq.getDimensions())
+ .setAggregatorSpecs(maq.getAggregatorSpecs())
+ .setPostAggregatorSpecs(maq.getPostAggregatorSpecs())
+ .setContext(maq.getContext());
+ GroupByQuery gbq = builder.build();
+
+ HashMap<String, Object> gbqResponse = new HashMap<>();
+ gbqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq));
+ gbqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
+
+ Sequence<Row> results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponse);
+ try {
+ // use localhost for remote address
+ requestLogger.logNativeQuery(RequestLogLine.forNative(
+ gbq,
+ DateTimes.nowUtc(),
+ "127.0.0.1",
+ new QueryStats(
+ ImmutableMap.of(
+ "query/time", 0,
+ "query/bytes", 0,
+ "success", true
+ ))
+ ));
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ resultsSeq = results;
+ } else {
+ // no dimensions, so optimize this as a TimeSeries
+ TimeseriesQuery tsq = new TimeseriesQuery(
+ dataSource,
+ new MultipleIntervalSegmentSpec(intervals),
+ false,
+ null,
+ maq.getFilter(),
+ maq.getGranularity(),
+ maq.getAggregatorSpecs(),
+ maq.getPostAggregatorSpecs(),
+ 0,
+ maq.getContext()
+ );
+ HashMap<String, Object> tsqResponse = new HashMap<>();
+ tsqResponse.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq));
+ tsqResponse.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
+
+ Sequence<Result<TimeseriesResultValue>> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponse);
+ try {
+ // use localhost for remote address
+ requestLogger.logNativeQuery(RequestLogLine.forNative(
+ tsq,
+ DateTimes.nowUtc(),
+ "127.0.0.1",
+ new QueryStats(
+ ImmutableMap.of(
+ "query/time", 0,
+ "query/bytes", 0,
+ "success", true
+ ))
+ ));
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ resultsSeq = Sequences.map(results, new TimeseriesResultToRow());
+ }
+
+ // Process into period buckets
+ Sequence<RowBucket> bucketedMovingAvgResults =
+ Sequences.simple(new RowBucketIterable(resultsSeq, intervals, period));
+
+ // Apply the windows analysis functions
+ Sequence<Row> movingAvgResults = Sequences.simple(
+ new MovingAverageIterable(
+ bucketedMovingAvgResults,
+ maq.getDimensions(),
+ maq.getAveragerSpecs(),
+ maq.getPostAggregatorSpecs(),
+ maq.getAggregatorSpecs()
+ ));
+
+ // Apply any postAveragers
+ Sequence<Row> movingAvgResultsWithPostAveragers =
+ Sequences.map(movingAvgResults, new PostAveragerAggregatorCalculator(maq));
+
+ // remove rows outside the reporting window
+ List<Interval> reportingIntervals = maq.getIntervals();
+ movingAvgResults =
+ Sequences.filter(
+ movingAvgResultsWithPostAveragers,
+ row -> reportingIntervals.stream().anyMatch(i -> i.contains(row.getTimestamp()))
+ );
+
+ // Apply any having, sorting, and limits
+ movingAvgResults = ((MovingAverageQuery) maq).applyLimit(movingAvgResults);
+
+ return movingAvgResults;
+
+ }
+
+ static class TimeseriesResultToRow implements Function<Result<TimeseriesResultValue>, Row>
+ {
+ @Override
+ public Row apply(Result<TimeseriesResultValue> lookbackResult)
+ {
+ Map<String, Object> event = lookbackResult.getValue().getBaseObject();
+ MapBasedRow row = new MapBasedRow(lookbackResult.getTimestamp(), event);
+ return row;
+ }
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java
new file mode 100644
index 0000000..b0e14af
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageQueryToolChest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.MetricManipulationFn;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.server.log.RequestLogger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The QueryToolChest for MovingAverage Query
+ */
+public class MovingAverageQueryToolChest extends QueryToolChest<Row, MovingAverageQuery>
+{
+
+ private final QuerySegmentWalker walker;
+ private final RequestLogger requestLogger;
+
+ private final MovingAverageQueryMetricsFactory movingAverageQueryMetricsFactory;
+
+ /**
+ * Construct a MovingAverageQueryToolChest for processing moving-average queries.
+ * MovingAverage queries are expected to be processed on broker nodes and never hit historical nodes.
+ *
+ * @param walker
+ * @param requestLogger
+ */
+ @Inject
+ public MovingAverageQueryToolChest(Provider<QuerySegmentWalker> walker, RequestLogger requestLogger)
+ {
+
+ this.walker = walker.get();
+ this.requestLogger = requestLogger;
+ this.movingAverageQueryMetricsFactory = DefaultMovingAverageQueryMetricsFactory.instance();
+ }
+
+ @Override
+ public QueryRunner<Row> mergeResults(QueryRunner<Row> runner)
+ {
+ return new MovingAverageQueryRunner(walker, requestLogger);
+ }
+
+ @Override
+ public QueryMetrics<? super MovingAverageQuery> makeMetrics(MovingAverageQuery query)
+ {
+ MovingAverageQueryMetrics movingAverageQueryMetrics = movingAverageQueryMetricsFactory.makeMetrics();
+ movingAverageQueryMetrics.query(query);
+ return movingAverageQueryMetrics;
+ }
+
+ @Override
+ public Function<Row, Row> makePostComputeManipulatorFn(MovingAverageQuery query, MetricManipulationFn fn)
+ {
+
+ return new Function<Row, Row>()
+ {
+
+ @Override
+ public Row apply(Row result)
+ {
+ MapBasedRow mRow = (MapBasedRow) result;
+ final Map<String, Object> values = new HashMap<>(mRow.getEvent());
+
+ for (AggregatorFactory agg : query.getAggregatorSpecs()) {
+ Object aggVal = values.get(agg.getName());
+ if (aggVal != null) {
+ values.put(agg.getName(), fn.manipulate(agg, aggVal));
+ } else {
+ values.put(agg.getName(), null);
+ }
+ }
+
+ for (AveragerFactory<?, ?> avg : query.getAveragerSpecs()) {
+ Object aggVal = values.get(avg.getName());
+ if (aggVal != null) {
+ values.put(avg.getName(), fn.manipulate(new AveragerFactoryWrapper<>(avg, avg.getName() + "_"), aggVal));
+ } else {
+ values.put(avg.getName(), null);
+ }
+ }
+
+ return new MapBasedRow(result.getTimestamp(), values);
+
+ }
+ };
+
+ }
+
+
+ @Override
+ public TypeReference<Row> getResultTypeReference()
+ {
+ return new TypeReference<Row>()
+ {
+ };
+ }
+
+ @Override
+ public Function<Row, Row> makePreComputeManipulatorFn(MovingAverageQuery query, MetricManipulationFn fn)
+ {
+ return Functions.identity();
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java
new file mode 100644
index 0000000..5af3487
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.query.aggregation.PostAggregator;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Function that can be applied to a Sequence to calculate PostAverager results
+ */
+public class PostAveragerAggregatorCalculator implements Function<Row, Row>
+{
+
+ private final List<PostAggregator> postAveragers;
+
+ public PostAveragerAggregatorCalculator(MovingAverageQuery maq)
+ {
+ this.postAveragers = maq.getPostAveragerSpecs();
+ }
+
+ @Override
+ public Row apply(final Row row)
+ {
+ if (postAveragers.isEmpty()) {
+ return row;
+ }
+
+ final Map<String, Object> newMap;
+
+ newMap = Maps.newLinkedHashMap(((MapBasedRow) row).getEvent());
+
+ for (PostAggregator postAverager : postAveragers) {
+ boolean allColsPresent = postAverager.getDependentFields().stream().allMatch(c -> newMap.get(c) != null);
+ newMap.put(postAverager.getName(), allColsPresent ? postAverager.compute(newMap) : null);
+ }
+
+ return new MapBasedRow(row.getTimestamp(), newMap);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java
new file mode 100644
index 0000000..fa614fa
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucket.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Represents a set of rows for a specific date
+ * Each RowBucket is an element in a list (holds a pointer to the next RowBucket)
+ */
+public class RowBucket
+{
+ private final DateTime dateTime;
+ private final List<Row> rows;
+ private RowBucket nextBucket = null;
+
+ public RowBucket(DateTime dateTime, List<Row> rows)
+ {
+ this.dateTime = dateTime;
+ this.rows = rows;
+ }
+
+ public DateTime getDateTime()
+ {
+ return dateTime;
+ }
+
+ public List<Row> getRows()
+ {
+ return rows;
+ }
+
+ public RowBucket getNextBucket()
+ {
+ return nextBucket;
+ }
+
+ public void setNextBucket(RowBucket nextRow)
+ {
+ this.nextBucket = nextRow;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
new file mode 100644
index 0000000..308d555
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/RowBucketIterable.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An iterator which takes list of rows ({@link Sequence<Row>}) and generates a new list of {@link RowBucket}s from it.
+ *
+ * It calls {@link BucketingAccumulator} for naive bucketing to buckets of periods,
+ * But does more subtle logic to cover edge cases, such as:
+ * - Handling periods with no rows.
+ * - Handling last record.
+ *
+ * Please notice this is being called by {@link MovingAverageIterable.MovingAverageIterator#internalNext()}
+ * and the logic for skipping records is comprised by the interaction between the two classes.
+ */
+public class RowBucketIterable implements Iterable<RowBucket>
+{
+
+ public final Sequence<Row> seq;
+ private List<Interval> intervals;
+ private Period period;
+
+ public RowBucketIterable(Sequence<Row> seq, List<Interval> intervals, Period period)
+ {
+ this.seq = seq;
+ this.period = period;
+ this.intervals = intervals;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Iterable#iterator()
+ */
+ @Override
+ public Iterator<RowBucket> iterator()
+ {
+ return new RowBucketIterator(seq, intervals, period);
+ }
+
+ static class RowBucketIterator implements Iterator<RowBucket>
+ {
+ private Yielder<RowBucket> yielder;
+ private DateTime endTime;
+ private DateTime expectedBucket;
+ private Period period;
+ private int intervalIndex = 0;
+ private List<Interval> intervals;
+ private boolean processedLastRow = false;
+ private boolean processedExtraRow = false;
+
+ public RowBucketIterator(Sequence<Row> rows, List<Interval> intervals, Period period)
+ {
+ this.period = period;
+ this.intervals = intervals;
+ expectedBucket = intervals.get(intervalIndex).getStart();
+ endTime = intervals.get(intervals.size() - 1).getEnd();
+ yielder = rows.toYielder(null, new BucketingAccumulator());
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#hasNext()
+ */
+ @Override
+ public boolean hasNext()
+ {
+ return expectedBucket.compareTo(endTime) < 0 || !this.yielder.isDone();
+ }
+
+ /* (non-Javadoc)
+ * @see java.util.Iterator#next()
+ */
+ @Override
+ public RowBucket next()
+ {
+ RowBucket currentBucket = yielder.get();
+
+ // Iterate to next interval
+ if (expectedBucket.compareTo(intervals.get(intervalIndex).getEnd()) >= 0) {
+ intervalIndex++;
+ if (intervalIndex < intervals.size()) {
+ expectedBucket = intervals.get(intervalIndex).getStart();
+ }
+ }
+ // currentBucket > expectedBucket (No rows found for period). Iterate to next period.
+ if (currentBucket != null && currentBucket.getDateTime().compareTo(expectedBucket) > 0) {
+ currentBucket = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ }
+
+ if (!yielder.isDone()) {
+ // standard case. return regular row
+ yielder = yielder.next(currentBucket);
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else if (!processedLastRow && yielder.get() != null && yielder.get().getNextBucket() == null) {
+ // yielder.isDone, processing last row
+ processedLastRow = true;
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else if (!processedExtraRow && yielder.get() != null && yielder.get().getNextBucket() != null) {
+ RowBucket lastRow = yielder.get().getNextBucket();
+
+ if (lastRow.getDateTime().compareTo(expectedBucket) > 0) {
+ lastRow = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return lastRow;
+ }
+
+ // yielder is done, processing newBucket
+ processedExtraRow = true;
+ expectedBucket = expectedBucket.plus(period);
+ return lastRow;
+ } else if (expectedBucket.compareTo(endTime) < 0) {
+ // add any trailing blank rows
+ currentBucket = new RowBucket(expectedBucket, Collections.emptyList());
+ expectedBucket = expectedBucket.plus(period);
+ return currentBucket;
+ } else {
+ // we should never get here
+ throw new NoSuchElementException();
+ }
+
+ }
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java
new file mode 100644
index 0000000..506380c
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/Averager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.util.Map;
+
+/**
+ * Interface for an averager
+ *
+ * @param <R> The return type of the averager
+ */
+public interface Averager<R>
+{
+ /**
+ * Add a row to the window being operated on
+ *
+ * @param e The row to add
+ * @param aggMap The Map of AggregatorFactory used to determine if the metric should to be finalized
+ */
+ void addElement(Map<String, Object> e, Map<String, AggregatorFactory> aggMap);
+
+ /**
+ * There is a missing row, so record a missing entry in the window
+ */
+ void skip();
+
+ /**
+ * Compute the resulting "average" over the collected window
+ *
+ * @return the "average" over the window of buckets
+ */
+ R getResult();
+
+ /**
+ * @return the name
+ */
+ String getName();
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java
new file mode 100644
index 0000000..a3c8278
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/AveragerFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Interface representing Averager in the movingAverage query.
+ *
+ * @param <R> Type returned by the underlying averager.
+ * @param <F> Type of finalized value.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "constant", value = ConstantAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMean", value = DoubleMeanAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMeanNoNulls", value = DoubleMeanNoNullAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMax", value = DoubleMaxAveragerFactory.class),
+ @JsonSubTypes.Type(name = "doubleMin", value = DoubleMinAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMean", value = LongMeanAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMeanNoNulls", value = LongMeanNoNullAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMax", value = LongMaxAveragerFactory.class),
+ @JsonSubTypes.Type(name = "longMin", value = LongMinAveragerFactory.class)
+})
+public interface AveragerFactory<R, F>
+{
+ int DEFAULT_PERIOD = 1;
+
+ /**
+ * Gets the column name that will be populated by the Averager
+ *
+ * @return The column name
+ */
+ String getName();
+
+ /**
+ * Returns the window size over which the averaging calculations will be
+ * performed. Size is computed in terms of buckets rather than absolute time.
+ *
+ * @return The window size
+ */
+ int getNumBuckets();
+
+ /**
+ * Returns the cycle size (number of periods to skip during averaging calculations).
+ *
+ * @return The cycle size
+ */
+ int getCycleSize();
+
+ /**
+ * Create an Averager for a specific dimension combination.
+ *
+ * @return The {@link Averager}
+ */
+ Averager<R> createAverager();
+
+ /**
+ * Gets the list of dependent fields that will be used by this Averager. Most
+ * {@link Averager}s depend on only a single field from the underlying query, but
+ * that is not required. This method allow the required fields to be communicated
+ * back to the main query so that validation to enforce the fields presence can
+ * be accomplished.
+ *
+ * @return A list of field names
+ */
+ List<String> getDependentFields();
+
+ /**
+ * Returns a {@link Comparator} that can be used to compare result values for
+ * purposes of sorting the end result of the query.
+ *
+ * @return A {@link Comparator}
+ */
+ Comparator<R> getComparator();
+
+ /**
+ * Finalize result value.
+ *
+ * @param val the value to finalize.
+ *
+ * @return The finalized value.
+ */
+ F finalizeComputation(R val);
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java
new file mode 100644
index 0000000..0c236b8
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAverager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.lang.reflect.Array;
+import java.util.Map;
+
+/**
+ * Common base class available for use by averagers. The base class implements methods that
+ * capture incoming and skipped rows and store them in an array, to be used later for
+ * calculating the actual value.
+ *
+ * @param <I> The type of intermediate value to be retrieved from the row and stored
+ * @param <R> The type of result the averager is expected to produce
+ */
+public abstract class BaseAverager<I, R extends Object> implements Averager<R>
+{
+
+ final int numBuckets;
+ final int cycleSize;
+ private final String name;
+ private final String fieldName;
+ final I[] buckets;
+ private int index;
+
+ /**
+ * {@link BaseAverager#startFrom} is needed because `buckets` field is a fixed array, not a list.
+ * It makes computeResults() start from the correct bucket in the array.
+ */
+ int startFrom = 0;
+
+ /**
+ * @param storageType The class to use for storing intermediate values
+ * @param numBuckets The number of buckets to include in the window being aggregated
+ * @param name The name of the resulting metric
+ * @param fieldName The field to extra from incoming rows and stored in the window cache
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public BaseAverager(Class<I> storageType, int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ this.numBuckets = numBuckets;
+ this.name = name;
+ this.fieldName = fieldName;
+ this.index = 0;
+ @SuppressWarnings("unchecked")
+ final I[] array = (I[]) Array.newInstance(storageType, numBuckets);
+ this.buckets = array;
+ this.cycleSize = cycleSize;
+ }
+
+
+ /* (non-Javadoc)
+ * @see Averager#addElement(java.util.Map, java.util.Map)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void addElement(Map<String, Object> e, Map<String, AggregatorFactory> a)
+ {
+ Object metric = e.get(fieldName);
+ I finalMetric;
+ if (a.containsKey(fieldName)) {
+ AggregatorFactory af = a.get(fieldName);
+ finalMetric = metric != null ? (I) af.finalizeComputation(metric) : null;
+ } else {
+ finalMetric = (I) metric;
+ }
+ buckets[index++] = finalMetric;
+ index %= numBuckets;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#skip()
+ */
+ @Override
+ public void skip()
+ {
+ buckets[index++] = null;
+ index %= numBuckets;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getResult()
+ */
+ @Override
+ public R getResult()
+ {
+ if (!hasData()) {
+ return null;
+ }
+ return computeResult();
+ }
+
+ /**
+ * Compute the result value to be returned by getResult.
+ *
+ * <p>This routine will only be called when there is valid data within the window
+ * and doesn't need to worry about detecting the case where no data should be returned.
+ *
+ * <p>
+ * The method typically should use {@link #getBuckets()} to retrieve the set of buckets
+ * within the window and then compute a value based on those. It should expect nulls within
+ * the array, indicating buckets where no row was found for the dimension combination. It is
+ * up to the actual implementation to determin how to evaluate those nulls.
+ *
+ * <p>
+ * The type returned is NOT required to be the same type as the intermediary value. For example,
+ * the intermediate value could be a Sketch, but the result a Long.
+ *
+ * @return the computed result
+ */
+ protected abstract R computeResult();
+
+ /* (non-Javadoc)
+ * @see Averager#getName()
+ */
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Returns the fieldname to be extracted from any event rows passed in and stored
+ * for use computing the windowed function.
+ *
+ * @return the fieldName
+ */
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ /**
+ * @return the numBuckets
+ */
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * @return the cycleSize
+ */
+ public int getCycleSize()
+ {
+ return cycleSize;
+ }
+
+ /**
+ * @return the array of buckets
+ */
+ protected I[] getBuckets()
+ {
+ return buckets;
+ }
+
+ /**
+ * Determines wheter any data is present. If all the buckets are empty (not "0"), then
+ * no value should be returned from the Averager, as there were not valid rows within the window.
+ *
+ * @return true if any non-null values available
+ */
+ protected boolean hasData()
+ {
+ for (Object b : buckets) {
+ if (b != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java
new file mode 100644
index 0000000..831000f
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common base class for AveragerFactories
+ *
+ * @param <R> Base type that the averager should return as a result
+ * @param <F> Type that that is returned from finalization
+ */
+public abstract class BaseAveragerFactory<R, F> implements AveragerFactory<R, F>
+{
+
+ protected String name;
+ protected String fieldName;
+ protected int numBuckets;
+ protected int cycleSize;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name of the Averager
+ * @param numBuckets Number of buckets in the analysis window
+ * @param fieldName Field from incoming events to include in the analysis
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public BaseAveragerFactory(String name, int numBuckets, String fieldName, Integer cycleSize)
+ {
+ this.name = name;
+ this.numBuckets = numBuckets;
+ this.fieldName = fieldName;
+ this.cycleSize = (cycleSize != null) ? cycleSize : DEFAULT_PERIOD;
+ Preconditions.checkNotNull(name, "Must have a valid, non-null averager name");
+ Preconditions.checkNotNull(fieldName, "Must have a valid, non-null field name");
+ Preconditions.checkArgument(this.cycleSize > 0, "Cycle size must be greater than zero");
+ Preconditions.checkArgument(numBuckets > 0, "Bucket size must be greater than zero");
+ Preconditions.checkArgument(!(this.cycleSize > numBuckets), "Cycle size must be less than the bucket size");
+ Preconditions.checkArgument(numBuckets % this.cycleSize == 0, "cycleSize must devide numBuckets without a remainder");
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @Override
+ @JsonProperty("buckets")
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @Override
+ @JsonProperty("cycleSize")
+ public int getCycleSize()
+ {
+ return cycleSize;
+ }
+
+ @Override
+ public List<String> getDependentFields()
+ {
+ return Collections.singletonList(fieldName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public F finalizeComputation(R val)
+ {
+ return (F) val;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java
new file mode 100644
index 0000000..0463d55
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ComparableAveragerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import java.util.Comparator;
+
+/**
+ * Base averager factory that adds a default comparable method.
+ *
+ * @param <R> return type
+ * @param <F> finalized type
+ */
+public abstract class ComparableAveragerFactory<R extends Comparable<R>, F> extends BaseAveragerFactory<R, F>
+{
+ /**
+ * Constructor.
+ *
+ * @param name Name of the Averager
+ * @param numBuckets Number of buckets in the analysis window
+ * @param fieldName Field from incoming events to include in the analysis
+ * @param cycleSize Cycle group size. Used to calculate day-of-week option. Default=1 (single element in group).
+ */
+ public ComparableAveragerFactory(String name, int numBuckets, String fieldName, Integer cycleSize)
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Comparator<R> getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java
new file mode 100644
index 0000000..bc76c99
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAverager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import java.util.Map;
+
+/**
+ * The constant averager.Created soley for incremental development and wiring things up.
+ */
+public class ConstantAverager implements Averager<Float>
+{
+
+ private String name;
+ private float retval;
+
+ /**
+ * @param n
+ * @param name
+ * @param retval
+ */
+ public ConstantAverager(int n, String name, float retval)
+ {
+ this.name = name;
+ this.retval = retval;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getResult()
+ */
+ @Override
+ public Float getResult()
+ {
+ return retval;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#getName()
+ */
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#addElement(java.util.Map, java.util.Map)
+ */
+ @Override
+ public void addElement(Map<String, Object> e, Map<String, AggregatorFactory> a)
+ {
+ // since we return a constant, no need to read from the event
+ }
+
+ /* (non-Javadoc)
+ * @see Averager#skip()
+ */
+ @Override
+ public void skip()
+ {
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java
new file mode 100644
index 0000000..45339c3
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/ConstantAveragerFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Implementation of AveragerFacvtory created solely for incremental development
+ */
+
+public class ConstantAveragerFactory implements AveragerFactory<Float, Float>
+{
+
+ private String name;
+ private int numBuckets;
+ private float retval;
+
+ @JsonCreator
+ public ConstantAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("retval") float retval
+ )
+ {
+ this.name = name;
+ this.numBuckets = numBuckets;
+ this.retval = retval;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ @JsonProperty("buckets")
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @JsonProperty
+ public float getRetval()
+ {
+ return retval;
+ }
+
+ @Override
+ public Averager<Float> createAverager()
+ {
+ return new ConstantAverager(numBuckets, name, retval);
+ }
+
+ @Override
+ public List<String> getDependentFields()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Comparator<Float> getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+ @Override
+ public int getCycleSize()
+ {
+ return 1;
+ }
+
+ @Override
+ public Float finalizeComputation(Float val)
+ {
+ return val;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java
new file mode 100644
index 0000000..5e25617
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMaxAverager extends BaseAverager<Number, Double>
+{
+
+ public DoubleMaxAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = Double.NEGATIVE_INFINITY;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Double.max(result, (buckets[(i + startFrom) % numBuckets]).doubleValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java
new file mode 100644
index 0000000..1e82f09
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMaxAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+
+ @JsonCreator
+ public DoubleMaxAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new DoubleMaxAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java
new file mode 100644
index 0000000..be9292c
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAverager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMeanAverager extends BaseAverager<Number, Double>
+{
+
+ public DoubleMeanAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = 0.0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).doubleValue();
+ } else {
+ result += 0.0;
+ }
+ validBuckets++;
+ }
+
+ startFrom++;
+ return result / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java
new file mode 100644
index 0000000..58f5446
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMeanAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+
+ @JsonCreator
+ public DoubleMeanAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new DoubleMeanAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java
new file mode 100644
index 0000000..573f12a
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAverager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMeanNoNullAverager extends BaseAverager<Number, Double>
+{
+
+ public DoubleMeanNoNullAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = 0.0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).doubleValue();
+ validBuckets++;
+ }
+ }
+
+ startFrom++;
+ return result / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java
new file mode 100644
index 0000000..d6e1189
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMeanNoNullAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+ @JsonCreator
+ public DoubleMeanNoNullAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new DoubleMeanNoNullAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java
new file mode 100644
index 0000000..d108fee
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class DoubleMinAverager extends BaseAverager<Number, Double>
+{
+
+ public DoubleMinAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ double result = Double.POSITIVE_INFINITY;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Double.min(result, (buckets[(i + startFrom) % numBuckets]).doubleValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java
new file mode 100644
index 0000000..35a783b
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DoubleMinAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+ @JsonCreator
+ public DoubleMinAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new DoubleMinAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
new file mode 100644
index 0000000..a45503c
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAverager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMaxAverager extends BaseAverager<Number, Long>
+{
+
+ public LongMaxAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Long computeResult()
+ {
+ long result = Long.MIN_VALUE;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Long.max(result, (buckets[(i + startFrom) % numBuckets]).longValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java
new file mode 100644
index 0000000..847bbcb
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMaxAveragerFactory extends ComparableAveragerFactory<Long, Long>
+{
+ @JsonCreator
+ public LongMaxAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Long> createAverager()
+ {
+ return new LongMaxAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java
new file mode 100644
index 0000000..a5919d7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAverager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMeanAverager extends BaseAverager<Number, Double>
+{
+
+ public LongMeanAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ long result = 0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).longValue();
+ } else {
+ result += 0;
+ }
+ validBuckets++;
+ }
+
+ startFrom++;
+ return ((double) result) / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java
new file mode 100644
index 0000000..d02e06d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMeanAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+
+ @JsonCreator
+ public LongMeanAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new LongMeanAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java
new file mode 100644
index 0000000..ecdd17a
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAverager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMeanNoNullAverager extends BaseAverager<Number, Double>
+{
+
+ public LongMeanNoNullAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Double computeResult()
+ {
+ long result = 0;
+ int validBuckets = 0;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result += (buckets[(i + startFrom) % numBuckets]).longValue();
+ validBuckets++;
+ }
+ }
+
+ startFrom++;
+ return ((double) result) / validBuckets;
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java
new file mode 100644
index 0000000..03ad7d1
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMeanNoNullAveragerFactory extends ComparableAveragerFactory<Double, Double>
+{
+
+ @JsonCreator
+ public LongMeanNoNullAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") Integer cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Double> createAverager()
+ {
+ return new LongMeanNoNullAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java
new file mode 100644
index 0000000..cc999e6
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAverager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+public class LongMinAverager extends BaseAverager<Number, Long>
+{
+
+ public LongMinAverager(int numBuckets, String name, String fieldName, int cycleSize)
+ {
+ super(Number.class, numBuckets, name, fieldName, cycleSize);
+ }
+
+ @Override
+ protected Long computeResult()
+ {
+ long result = Long.MAX_VALUE;
+
+ for (int i = 0; i < numBuckets; i += cycleSize) {
+ if (buckets[(i + startFrom) % numBuckets] != null) {
+ result = Long.min(result, (buckets[(i + startFrom) % numBuckets]).longValue());
+ }
+ }
+
+ startFrom++;
+ return result;
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java
new file mode 100644
index 0000000..ff25625
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class LongMinAveragerFactory extends ComparableAveragerFactory<Long, Long>
+{
+
+ @JsonCreator
+ public LongMinAveragerFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("buckets") int numBuckets,
+ @JsonProperty("cycleSize") int cycleSize,
+ @JsonProperty("fieldName") String fieldName
+ )
+ {
+ super(name, numBuckets, fieldName, cycleSize);
+ }
+
+ @Override
+ public Averager<Long> createAverager()
+ {
+ return new LongMinAverager(numBuckets, name, fieldName, cycleSize);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 0000000..ec70e7d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.query.movingaverage.MovingAverageQueryModule
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
new file mode 100644
index 0000000..3acc1f7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageIterableTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.ConstantAveragerFactory;
+import org.apache.druid.query.movingaverage.averagers.LongMeanAveragerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class MovingAverageIterableTest
+{
+ private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+
+ private static final String GENDER = "gender";
+ private static final String AGE = "age";
+ private static final String COUNTRY = "country";
+
+ private static final Map<String, Object> dims1 = new HashMap<>();
+ private static final Map<String, Object> dims2 = new HashMap<>();
+ private static final Map<String, Object> dims3 = new HashMap<>();
+
+ static {
+ dims1.put(GENDER, "m");
+ dims1.put(AGE, "10");
+ dims1.put(COUNTRY, "US");
+
+ dims2.put(GENDER, "f");
+ dims2.put(AGE, "8");
+ dims2.put(COUNTRY, "US");
+
+ dims3.put(GENDER, "u");
+ dims3.put(AGE, "5");
+ dims3.put(COUNTRY, "UK");
+ }
+
+ @Test
+ public void testNext()
+ {
+
+ List<DimensionSpec> dims = Arrays.asList(
+ new DefaultDimensionSpec(GENDER, GENDER),
+ new DefaultDimensionSpec(AGE, AGE),
+ new DefaultDimensionSpec(COUNTRY, COUNTRY)
+ );
+
+ Sequence<RowBucket> dayBuckets = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(
+ new MapBasedRow(JAN_1, dims1),
+ new MapBasedRow(JAN_1, dims2)
+ )),
+ new RowBucket(JAN_2, Collections.singletonList(
+ new MapBasedRow(JAN_2, dims1)
+ )),
+ new RowBucket(JAN_3, Collections.emptyList()),
+ new RowBucket(JAN_4, Arrays.asList(
+ new MapBasedRow(JAN_4, dims2),
+ new MapBasedRow(JAN_4, dims3)
+ ))
+ ));
+
+ Iterable<Row> iterable = new MovingAverageIterable(
+ dayBuckets,
+ dims,
+ Collections.singletonList(new ConstantAveragerFactory("noop", 1, 1.1f)),
+ Collections.emptyList(),
+ Collections.emptyList()
+ );
+
+ Iterator<Row> iter = iterable.iterator();
+
+ assertTrue(iter.hasNext());
+ Row r = iter.next();
+ assertEquals(JAN_1, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_1, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_2, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_2, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ Row r2 = r;
+ assertEquals(JAN_3, r.getTimestamp());
+ assertEquals("US", r.getRaw(COUNTRY));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_3, r.getTimestamp());
+ assertEquals("US", r.getRaw(COUNTRY));
+ assertThat(r.getRaw(AGE), not(equalTo(r2.getRaw(AGE))));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("f", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("u", r.getRaw(GENDER));
+
+ assertTrue(iter.hasNext());
+ r = iter.next();
+ assertEquals(JAN_4, r.getTimestamp());
+ assertEquals("m", r.getRaw(GENDER));
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testAveraging()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+ Map<String, Object> event3 = new HashMap<>();
+ Map<String, Object> event4 = new HashMap<>();
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_1, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_2, event2);
+
+ event3.put("gender", "m");
+ event3.put("pageViews", 30L);
+ Row row3 = new MapBasedRow(JAN_3, event3);
+
+ event4.put("gender", "f");
+ event4.put("pageViews", 40L);
+ Row row4 = new MapBasedRow(JAN_3, event4);
+
+ float retval = 14.5f;
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(row1)),
+ new RowBucket(JAN_2, Collections.singletonList(row2)),
+ new RowBucket(JAN_3, Arrays.asList(row3, row4))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Arrays.asList(
+ new ConstantAveragerFactory("costPageViews", 7, retval),
+ new LongMeanAveragerFactory("movingAvgPageViews", 7, 1, "pageViews")
+ ),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row caResult = iter.next();
+
+ assertEquals(JAN_1, caResult.getTimestamp());
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(retval, caResult.getMetric("costPageViews").floatValue(), 0.0f);
+ assertEquals(1.4285715f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(4.285714f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("m", (caResult.getDimension("gender")).get(0));
+ assertEquals(8.571428f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ caResult = iter.next();
+ assertEquals("f", (caResult.getDimension("gender")).get(0));
+ assertEquals(5.714285850f, caResult.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+
+ }
+
+
+ @Test
+ public void testCompleteData()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+ Map<String, Object> event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+ Row jan1Row2 = new MapBasedRow(JAN_1, event2);
+ Row jan1Row3 = new MapBasedRow(JAN_1, event3);
+
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+ Row jan2Row2 = new MapBasedRow(JAN_2, event2);
+ Row jan2Row3 = new MapBasedRow(JAN_2, event3);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1, jan1Row2, jan1Row3)),
+ new RowBucket(JAN_2, Arrays.asList(jan2Row1, jan2Row2, jan2Row3))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+
+ }
+
+ // no injection if the data missing at the begining
+ @Test
+ public void testMissingDataAtBeginning()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+ Map<String, Object> event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+ Row jan2Row2 = new MapBasedRow(JAN_2, event2);
+ Row jan2Row3 = new MapBasedRow(JAN_2, event3);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(jan1Row1)),
+ new RowBucket(JAN_2, Arrays.asList(jan2Row1, jan2Row2, jan2Row3))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // test injection when the data is missing at the end
+ @Test
+ public void testMissingDataAtTheEnd()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+ Map<String, Object> event3 = new HashMap<>();
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ event2.put("gender", "f");
+ event2.put("pageViews", 20L);
+ event3.put("gender", "u");
+ event3.put("pageViews", 30L);
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1 = new MapBasedRow(JAN_1, event1);
+ Row jan1Row2 = new MapBasedRow(JAN_1, event2);
+ Row jan1Row3 = new MapBasedRow(JAN_1, event3);
+ Row jan2Row1 = new MapBasedRow(JAN_2, event1);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1, jan1Row2, jan1Row3)),
+ new RowBucket(JAN_2, Collections.singletonList(jan2Row1))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 2, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // test injection when the data is missing in the middle
+ @Test
+ public void testMissingDataAtMiddle()
+ {
+
+ Map<String, Object> eventM = new HashMap<>();
+ Map<String, Object> eventF = new HashMap<>();
+ Map<String, Object> eventU = new HashMap<>();
+ Map<String, Object> event4 = new HashMap<>();
+
+ eventM.put("gender", "m");
+ eventM.put("pageViews", 10L);
+ eventF.put("gender", "f");
+ eventF.put("pageViews", 20L);
+ eventU.put("gender", "u");
+ eventU.put("pageViews", 30L);
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ Row jan1Row1M = new MapBasedRow(JAN_1, eventM);
+ Row jan1Row2F = new MapBasedRow(JAN_1, eventF);
+ Row jan1Row3U = new MapBasedRow(JAN_1, eventU);
+ Row jan2Row1M = new MapBasedRow(JAN_2, eventM);
+ Row jan3Row1M = new MapBasedRow(JAN_3, eventM);
+ Row jan3Row2F = new MapBasedRow(JAN_3, eventF);
+ Row jan3Row3U = new MapBasedRow(JAN_3, eventU);
+ Row jan4Row1M = new MapBasedRow(JAN_4, eventM);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Arrays.asList(jan1Row1M, jan1Row2F, jan1Row3U)),
+ new RowBucket(JAN_2, Collections.singletonList(jan2Row1M)),
+ new RowBucket(JAN_3, Arrays.asList(jan3Row1M, jan3Row2F, jan3Row3U)),
+ new RowBucket(JAN_4, Collections.singletonList(jan4Row1M))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 3, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ // Jan 1
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_1, (result.getTimestamp()));
+
+ // Jan 2
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_2, (result.getTimestamp()));
+
+ // Jan 3
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_3, (result.getTimestamp()));
+
+ // Jan 4
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("u", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("f", (result.getDimension("gender")).get(0));
+ assertEquals(JAN_4, (result.getTimestamp()));
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testMissingDaysAtBegining()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_3, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_4, event2);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.emptyList()),
+ new RowBucket(JAN_2, Collections.emptyList()),
+ new RowBucket(JAN_3, Collections.singletonList(row1)),
+ new RowBucket(JAN_4, Collections.singletonList(row2))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 4, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testMissingDaysInMiddle()
+ {
+ System.setProperty("druid.generic.useDefaultValueForNull", "true");
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_1, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_4, event2);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(row1)),
+ new RowBucket(JAN_2, Collections.emptyList()),
+ new RowBucket(JAN_3, Collections.emptyList()),
+ new RowBucket(JAN_4, Collections.singletonList(row2))
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 4, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testWithFilteredAggregation()
+ {
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_1, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_4, event2);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(row1)),
+ new RowBucket(JAN_2, Collections.emptyList()),
+ new RowBucket(JAN_3, Collections.emptyList()),
+ new RowBucket(JAN_4, Collections.singletonList(row2))
+ ));
+
+ AveragerFactory averagerfactory = new LongMeanAveragerFactory("movingAvgPageViews", 4, 1, "pageViews");
+ AggregatorFactory aggregatorFactory = new LongSumAggregatorFactory("pageViews", "pageViews");
+ DimFilter filter = new SelectorDimFilter("gender", "m", null);
+ FilteredAggregatorFactory filteredAggregatorFactory = new FilteredAggregatorFactory(aggregatorFactory, filter);
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ averagerfactory),
+ Collections.emptyList(),
+ Collections.singletonList(
+ filteredAggregatorFactory)
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testMissingDaysAtEnd()
+ {
+ System.setProperty("druid.generic.useDefaultValueForNull", "true");
+
+ Map<String, Object> event1 = new HashMap<>();
+ Map<String, Object> event2 = new HashMap<>();
+
+ List<DimensionSpec> ds = new ArrayList<>();
+ ds.add(new DefaultDimensionSpec("gender", "gender"));
+
+ event1.put("gender", "m");
+ event1.put("pageViews", 10L);
+ Row row1 = new MapBasedRow(JAN_1, event1);
+
+ event2.put("gender", "m");
+ event2.put("pageViews", 20L);
+ Row row2 = new MapBasedRow(JAN_2, event2);
+
+ Sequence<RowBucket> seq = Sequences.simple(Arrays.asList(
+ new RowBucket(JAN_1, Collections.singletonList(row1)),
+ new RowBucket(JAN_2, Collections.singletonList(row2)),
+ new RowBucket(JAN_3, Collections.emptyList()),
+ new RowBucket(JAN_4, Collections.emptyList()),
+ new RowBucket(JAN_5, Collections.emptyList()),
+ new RowBucket(JAN_6, Collections.emptyList())
+ ));
+
+ Iterator<Row> iter = new MovingAverageIterable(seq, ds, Collections.singletonList(
+ new LongMeanAveragerFactory("movingAvgPageViews", 4, 1, "pageViews")),
+ Collections.emptyList(),
+ Collections.singletonList(new LongSumAggregatorFactory("pageViews",
+ "pageViews"
+ ))
+ ).iterator();
+
+ assertTrue(iter.hasNext());
+ Row result = iter.next();
+
+ assertEquals(JAN_1, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(2.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals(JAN_2, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals(JAN_3, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals(JAN_4, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(7.5f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals(JAN_5, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(5.0f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertTrue(iter.hasNext());
+ result = iter.next();
+ assertEquals(JAN_6, result.getTimestamp());
+ assertEquals("m", (result.getDimension("gender")).get(0));
+ assertEquals(0.0f, result.getMetric("movingAvgPageViews").floatValue(), 0.0f);
+
+ assertFalse(iter.hasNext());
+
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
new file mode 100644
index 0000000..a7e1eb7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Maps;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.TimelineServerView;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.client.selector.ServerSelector;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.guice.DruidProcessingModule;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.QueryRunnerFactoryModule;
+import org.apache.druid.guice.QueryableModule;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.common.guava.Accumulators;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.RetryQueryRunnerConfig;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.movingaverage.test.TestConfig;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.server.ClientQuerySegmentWalker;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.TimelineLookup;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Base class for implementing MovingAverageQuery tests
+ */
+@RunWith(Parameterized.class)
+public class MovingAverageQueryTest
+{
+ private final ObjectMapper jsonMapper;
+ private final Injector injector;
+ private final QueryToolChestWarehouse warehouse;
+ private final RetryQueryRunnerConfig retryConfig;
+ private final ServerConfig serverConfig;
+
+ private final List<Row> groupByResults = new ArrayList<>();
+ private final List<Result<TimeseriesResultValue>> timeseriesResults = new ArrayList<>();
+
+ private final TestConfig config;
+ private final String yamlFile;
+
+ @Parameters(name = "{0}")
+ public static Iterable<String[]> data() throws IOException
+ {
+ BufferedReader testReader = new BufferedReader(
+ new InputStreamReader(MovingAverageQueryTest.class.getResourceAsStream("/queryTests"), StandardCharsets.UTF_8));
+ List<String[]> tests = new ArrayList<>();
+
+ for (String line = testReader.readLine(); line != null; line = testReader.readLine()) {
+ tests.add(new String[] {line});
+ }
+
+ return tests;
+ }
+
+ public MovingAverageQueryTest(String yamlFile) throws IOException
+ {
+ this.yamlFile = yamlFile;
+
+ List<Module> modules = getRequiredModules();
+ modules.add(
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("queryTest");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(1);
+ binder.bind(QuerySegmentWalker.class).toProvider(Providers.of(null));
+ }
+ );
+
+ System.setProperty("druid.generic.useDefaultValueForNull", "true");
+ System.setProperty("druid.processing.buffer.sizeBytes", "655360");
+ Injector baseInjector = GuiceInjectors.makeStartupInjector();
+ injector = Initialization.makeInjectorWithModules(baseInjector, modules);
+
+ jsonMapper = injector.getInstance(ObjectMapper.class);
+ warehouse = injector.getInstance(QueryToolChestWarehouse.class);
+ retryConfig = injector.getInstance(RetryQueryRunnerConfig.class);
+ serverConfig = injector.getInstance(ServerConfig.class);
+
+ InputStream is = getClass().getResourceAsStream("/queryTests/" + yamlFile);
+ ObjectMapper reader = new ObjectMapper(new YAMLFactory());
+ config = reader.readValue(is, TestConfig.class);
+ }
+
+ /**
+ * Returns the JSON query that should be used in the test.
+ *
+ * @return The JSON query
+ */
+ protected String getQueryString()
+ {
+ return config.query.toString();
+ }
+
+ /**
+ * Returns the JSON result that should be expected from the query.
+ *
+ * @return The JSON result
+ */
+ protected String getExpectedResultString()
+ {
+ return config.expectedOutput.toString();
+ }
+
+ /**
+ * Returns the JSON result that the nested groupby query should produce.
+ * Either this method or {@link #getTimeseriesResultJson()} must be defined
+ * by the subclass.
+ *
+ * @return The JSON result from the groupby query
+ */
+ protected String getGroupByResultJson()
+ {
+ ArrayNode node = config.intermediateResults.get("groupBy");
+ return node == null ? null : node.toString();
+ }
+
+ /**
+ * Returns the JSON result that the nested timeseries query should produce.
+ * Either this method or {@link #getGroupByResultJson()} must be defined
+ * by the subclass.
+ *
+ * @return The JSON result from the timeseries query
+ */
+ protected String getTimeseriesResultJson()
+ {
+ ArrayNode node = config.intermediateResults.get("timeseries");
+ return node == null ? null : node.toString();
+ }
+
+ /**
+ * Returns the expected query type.
+ *
+ * @return The Query type
+ */
+ protected Class<?> getExpectedQueryType()
+ {
+ return MovingAverageQuery.class;
+ }
+
+ protected TypeReference<?> getExpectedResultType()
+ {
+ return new TypeReference<List<Row>>()
+ {
+ };
+ }
+
+ /**
+ * Returns a list of any additional Druid Modules necessary to run the test.
+ *
+ * @return List of Druid Modules
+ */
+ protected List<Module> getRequiredModules()
+ {
+ List<Module> list = new ArrayList<>();
+
+ list.add(new QueryRunnerFactoryModule());
+ list.add(new QueryableModule());
+ list.add(new DruidProcessingModule());
+
+ return list;
+ }
+
+ /**
+ * Set up any needed mocks to stub out backend query behavior.
+ *
+ * @throws IOException
+ * @throws JsonMappingException
+ * @throws JsonParseException
+ */
+ protected void defineMocks() throws IOException
+ {
+ groupByResults.clear();
+ timeseriesResults.clear();
+
+ if (getGroupByResultJson() != null) {
+ groupByResults.addAll(jsonMapper.readValue(getGroupByResultJson(), new TypeReference<List<Row>>()
+ {
+ }));
+ }
+
+ if (getTimeseriesResultJson() != null) {
+ timeseriesResults.addAll(jsonMapper.readValue(
+ getTimeseriesResultJson(),
+ new TypeReference<List<Result<TimeseriesResultValue>>>()
+ {
+ }
+ ));
+ }
+ }
+
+ /**
+ * converts Int to Long, Float to Double in the actual and expected result
+ *
+ * @param result
+ */
+ protected List<MapBasedRow> consistentTypeCasting(List<MapBasedRow> result)
+ {
+ List<MapBasedRow> newResult = new ArrayList<>();
+ for (MapBasedRow row : result) {
+ final Map<String, Object> event = Maps.newLinkedHashMap((row).getEvent());
+ event.forEach((key, value) -> {
+ if (Integer.class.isInstance(value)) {
+ event.put(key, ((Integer) value).longValue());
+ }
+ if (Float.class.isInstance(value)) {
+ event.put(key, ((Float) value).doubleValue());
+ }
+ });
+ newResult.add(new MapBasedRow(row.getTimestamp(), event));
+ }
+
+ return newResult;
+ }
+
+ /**
+ * Validate that the specified query behaves correctly.
+ *
+ * @throws IOException
+ * @throws JsonMappingException
+ * @throws JsonParseException
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Test
+ public void testQuery() throws IOException
+ {
+
+
+ // create mocks for nested queries
+ @SuppressWarnings("unused")
+
+ MockUp<GroupByQuery> groupByQuery = new MockUp<GroupByQuery>()
+ {
+ @Mock
+ public QueryRunner getRunner(QuerySegmentWalker walker)
+ {
+ return new QueryRunner()
+ {
+ @Override
+ public Sequence run(QueryPlus queryPlus, Map responseContext)
+ {
+ return Sequences.simple(groupByResults);
+ }
+ };
+ }
+ };
+
+
+ @SuppressWarnings("unused")
+ MockUp<TimeseriesQuery> timeseriesQuery = new MockUp<TimeseriesQuery>()
+ {
+ @Mock
+ public QueryRunner getRunner(QuerySegmentWalker walker)
+ {
+ return new QueryRunner()
+ {
+ @Override
+ public Sequence run(QueryPlus queryPlus, Map responseContext)
+ {
+ return Sequences.simple(timeseriesResults);
+ }
+ };
+ }
+ };
+
+
+ Query<?> query = jsonMapper.readValue(getQueryString(), Query.class);
+ assertThat(query, IsInstanceOf.instanceOf(getExpectedQueryType()));
+
+ List<MapBasedRow> expectedResults = jsonMapper.readValue(getExpectedResultString(), getExpectedResultType());
+ assertNotNull(expectedResults);
+ assertThat(expectedResults, IsInstanceOf.instanceOf(List.class));
+
+ CachingClusteredClient baseClient = new CachingClusteredClient(
+ warehouse,
+ new TimelineServerView()
+ {
+ @Override
+ public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
+ {
+ return null;
+ }
+
+ @Override
+ public List<ImmutableDruidServer> getDruidServers()
+ {
+ return null;
+ }
+
+ @Override
+ public <T> QueryRunner<T> getQueryRunner(DruidServer server)
+ {
+ return null;
+ }
+
+ @Override
+ public void registerTimelineCallback(Executor exec, TimelineCallback callback)
+ {
+
+ }
+
+ @Override
+ public void registerSegmentCallback(Executor exec, SegmentCallback callback)
+ {
+
+ }
+
+ @Override
+ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
+ {
+
+ }
+ },
+ MapCache.create(100000),
+ jsonMapper,
+ new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1),
+ new CacheConfig(),
+ new DruidHttpClientConfig()
+ {
+ @Override
+ public long getMaxQueuedBytes()
+ {
+ return 0L;
+ }
+ }
+ );
+
+ ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
+ new ServiceEmitter("", "", null)
+ {
+ @Override
+ public void emit(Event event) {}
+ },
+ baseClient, warehouse, retryConfig, jsonMapper, serverConfig, null, new CacheConfig()
+ );
+ final Map<String, Object> responseContext = new HashMap<>();
+
+ defineMocks();
+
+ QueryPlus queryPlus = QueryPlus.wrap(query);
+ final Sequence<?> res = query.getRunner(walker).run(queryPlus, responseContext);
+
+ List actualResults = new ArrayList();
+ actualResults = (List<MapBasedRow>) res.accumulate(actualResults, Accumulators.list());
+
+ expectedResults = consistentTypeCasting(expectedResults);
+ actualResults = consistentTypeCasting(actualResults);
+
+ assertEquals(expectedResults, actualResults);
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculatorTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculatorTest.java
new file mode 100644
index 0000000..51c7077
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/PostAveragerAggregatorCalculatorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import org.apache.druid.query.movingaverage.averagers.DoubleMeanAveragerFactory;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Unit tests for PostAveragerCalcualtor
+ */
+public class PostAveragerAggregatorCalculatorTest
+{
+
+ private MovingAverageQuery query;
+ private PostAveragerAggregatorCalculator pac;
+ private Map<String, Object> event;
+ private MapBasedRow row;
+
+ @Before
+ public void setup()
+ {
+ System.setProperty("druid.generic.useDefaultValueForNull", "true");
+ query = new MovingAverageQuery(
+ new TableDataSource("d"),
+ new MultipleIntervalSegmentSpec(Collections.singletonList(new Interval("2017-01-01/2017-01-01", ISOChronology.getInstanceUTC()))),
+ null,
+ Granularities.DAY,
+ null,
+ Collections.singletonList(new CountAggregatorFactory("count")),
+ Collections.emptyList(),
+ null,
+ Collections.singletonList(new DoubleMeanAveragerFactory("avgCount", 7, 1, "count")),
+ Collections.singletonList(new ArithmeticPostAggregator(
+ "avgCountRatio",
+ "/",
+ Arrays.asList(
+ new FieldAccessPostAggregator("count", "count"),
+ new FieldAccessPostAggregator("avgCount", "avgCount")
+ )
+ )),
+ null,
+ null
+ );
+
+ pac = new PostAveragerAggregatorCalculator(query);
+ event = new HashMap<>();
+ row = new MapBasedRow(new DateTime(ISOChronology.getInstanceUTC()), event);
+ }
+
+ @Test
+ public void testApply()
+ {
+ event.put("count", new Double(10.0));
+ event.put("avgCount", new Double(12.0));
+
+ Row result = pac.apply(row);
+
+ assertEquals(result.getMetric("avgCountRatio").floatValue(), 10.0f / 12.0f, 0.0);
+ }
+
+ @Test
+ public void testApplyMissingColumn()
+ {
+ event.put("count", new Double(10.0));
+
+ Row result = pac.apply(row);
+
+ assertEquals(result.getMetric("avgCountRatio").floatValue(), 0.0, 0.0);
+ assertNull(result.getRaw("avgCountRatio"));
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/RowBucketIterableTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/RowBucketIterableTest.java
new file mode 100644
index 0000000..7504a97
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/RowBucketIterableTest.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RowBucketIterableTest
+{
+
+ private static final DateTime JAN_1 = new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_2 = new DateTime(2017, 1, 2, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_3 = new DateTime(2017, 1, 3, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_4 = new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_5 = new DateTime(2017, 1, 5, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_6 = new DateTime(2017, 1, 6, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+ private static final DateTime JAN_9 = new DateTime(2017, 1, 9, 0, 0, 0, 0, ISOChronology.getInstanceUTC());
+
+ private static final Map<String, Object> EVENT_M_10 = new HashMap<>();
+ private static final Map<String, Object> EVENT_F_20 = new HashMap<>();
+ private static final Map<String, Object> EVENT_U_30 = new HashMap<>();
+
+ private static final Row JAN_1_M_10 = new MapBasedRow(new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_1_F_20 = new MapBasedRow(new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_F_20);
+ private static final Row JAN_1_U_30 = new MapBasedRow(new DateTime(2017, 1, 1, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_U_30);
+ private static final Row JAN_2_M_10 = new MapBasedRow(new DateTime(2017, 1, 2, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_3_M_10 = new MapBasedRow(new DateTime(2017, 1, 3, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_3_F_20 = new MapBasedRow(new DateTime(2017, 1, 3, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_F_20);
+ private static final Row JAN_4_M_10 = new MapBasedRow(new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_4_F_20 = new MapBasedRow(new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_F_20);
+ private static final Row JAN_4_U_30 = new MapBasedRow(new DateTime(2017, 1, 4, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_U_30);
+ private static final Row JAN_5_M_10 = new MapBasedRow(new DateTime(2017, 1, 5, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_6_M_10 = new MapBasedRow(new DateTime(2017, 1, 6, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_M_10);
+ private static final Row JAN_7_F_20 = new MapBasedRow(new DateTime(2017, 1, 7, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_F_20);
+ private static final Row JAN_8_U_30 = new MapBasedRow(new DateTime(2017, 1, 8, 0, 0, 0, 0, ISOChronology.getInstanceUTC()), EVENT_U_30);
+
+ private static final Interval INTERVAL_JAN_1_1 = new Interval(JAN_1, JAN_2);
+ private static final Interval INTERVAL_JAN_1_2 = new Interval(JAN_1, JAN_3);
+ private static final Interval INTERVAL_JAN_1_4 = new Interval(JAN_1, JAN_5);
+ private static final Interval INTERVAL_JAN_1_5 = new Interval(JAN_1, JAN_6);
+ private static final Interval INTERVAL_JAN_6_8 = new Interval(JAN_6, JAN_9);
+ private static final Period ONE_DAY = Period.days(1);
+
+ private List<Row> rows = null;
+ private List<Interval> intervals = new ArrayList<>();
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ EVENT_M_10.put("gender", "m");
+ EVENT_M_10.put("pageViews", 10L);
+ EVENT_F_20.put("gender", "f");
+ EVENT_F_20.put("pageViews", 20L);
+ EVENT_U_30.put("gender", "u");
+ EVENT_U_30.put("pageViews", 30L);
+ }
+
+ // normal case. data for all the days present
+ @Test
+ public void testCompleteData()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_M_10);
+ rows.add(JAN_4_M_10);
+
+ List<Row> expectedDay1 = Collections.singletonList(JAN_1_M_10);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_M_10);
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // all days present and last day only has one row
+ @Test
+ public void testApplyLastDaySingleRow()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_F_20);
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_F_20);
+ rows.add(JAN_4_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // all days present and last day has multiple rows
+ @Test
+ public void testApplyLastDayMultipleRows()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_F_20);
+ List<Row> expectedDay4 = Arrays.asList(JAN_4_M_10, JAN_4_F_20, JAN_4_U_30);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_F_20);
+ rows.add(JAN_4_M_10);
+ rows.add(JAN_4_F_20);
+ rows.add(JAN_4_U_30);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // test single day with single row
+ @Test
+ public void testSingleDaySingleRow()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_1);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+
+ List<Row> expectedDay1 = Collections.singletonList(JAN_1_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+ assertEquals(JAN_1, actual.getDateTime());
+
+ }
+
+ // test single day with multiple rows
+ @Test
+ public void testSingleDayMultipleRow()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_1);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_1_U_30);
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20, JAN_1_U_30);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ }
+
+ // missing day at the beginning followed by single row
+ @Test
+ public void testMissingDaysAtBegining()
+ {
+
+ List<Row> expectedDay1 = Collections.emptyList();
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_2);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_2_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ }
+
+ // missing day at the beginning followed by multiple row
+ @Test
+ public void testMissingDaysAtBeginingFollowedByMultipleRow()
+ {
+
+ List<Row> expectedDay1 = Collections.emptyList();
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_M_10);
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_M_10);
+ rows.add(JAN_4_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // missing day at the beginning and at the end
+ @Test
+ public void testMissingDaysAtBeginingAndAtTheEnd()
+ {
+
+ List<Row> expectedDay1 = Collections.emptyList();
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_M_10);
+ List<Row> expectedDay4 = Collections.emptyList();
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // multiple missing days in an interval
+ @Test
+ public void testMultipleMissingDays()
+ {
+
+ List<Row> expectedDay1 = Collections.emptyList();
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.emptyList();
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_4_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // multiple missing days in an interval followed by multiple row at the end
+ @Test
+ public void testMultipleMissingDaysMultipleRowAtTheEnd()
+ {
+
+ List<Row> expectedDay1 = Collections.emptyList();
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.emptyList();
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+ List<Row> expectedDay5 = Collections.singletonList(JAN_5_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_5);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_4_M_10);
+ rows.add(JAN_5_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_5, actual.getDateTime());
+ assertEquals(expectedDay5, actual.getRows());
+ }
+
+
+ // missing day in the middle followed by single row
+ @Test
+ public void testMissingDaysInMiddleOneRow()
+ {
+
+ List<Row> expectedDay1 = Collections.singletonList(JAN_1_M_10);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.emptyList();
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_4_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay4, actual.getRows());
+
+ }
+
+ // missing day in the middle followed by multiple rows
+ @Test
+ public void testMissingDaysInMiddleMultipleRow()
+ {
+
+ List<Row> expectedDay1 = Collections.singletonList(JAN_1_M_10);
+ List<Row> expectedDay2 = Collections.emptyList();
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_M_10);
+ List<Row> expectedDay4 = Collections.singletonList(JAN_4_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_3_M_10);
+ rows.add(JAN_4_M_10);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(JAN_1, actual.getDateTime());
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_2, actual.getDateTime());
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+
+ }
+
+ // data missing for last day .
+ @Test
+ public void testApplyLastDayNoRows()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_F_20);
+ List<Row> expectedDay4 = Collections.emptyList();
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_F_20);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+ // data missing for last two days
+ @Test
+ public void testApplyLastTwoDayNoRows()
+ {
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.emptyList();
+ List<Row> expectedDay4 = Collections.emptyList();
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_2_M_10);
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_3, actual.getDateTime());
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(JAN_4, actual.getDateTime());
+ assertEquals(expectedDay4, actual.getRows());
+ }
+
+
+ @Test
+ public void testApplyMultipleInterval()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+ intervals.add(INTERVAL_JAN_6_8);
+
+ List<Row> expectedDay1 = Arrays.asList(JAN_1_M_10, JAN_1_F_20);
+ List<Row> expectedDay2 = Collections.singletonList(JAN_2_M_10);
+ List<Row> expectedDay3 = Collections.singletonList(JAN_3_F_20);
+ List<Row> expectedDay4 = Arrays.asList(JAN_4_M_10, JAN_4_F_20, JAN_4_U_30);
+ List<Row> expectedDay6 = Collections.singletonList(JAN_6_M_10);
+ List<Row> expectedDay7 = Collections.singletonList(JAN_7_F_20);
+ List<Row> expectedDay8 = Collections.singletonList(JAN_8_U_30);
+
+ rows = new ArrayList<>();
+ rows.add(JAN_1_M_10);
+ rows.add(JAN_1_F_20);
+ rows.add(JAN_2_M_10);
+ rows.add(JAN_3_F_20);
+ rows.add(JAN_4_M_10);
+ rows.add(JAN_4_F_20);
+ rows.add(JAN_4_U_30);
+ rows.add(JAN_6_M_10);
+ rows.add(JAN_7_F_20);
+ rows.add(JAN_8_U_30);
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ RowBucket actual = iter.next();
+ assertEquals(expectedDay1, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay2, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay3, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay4, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay6, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay7, actual.getRows());
+
+ actual = iter.next();
+ assertEquals(expectedDay8, actual.getRows());
+ }
+
+ @Test
+ public void testNodata()
+ {
+
+ intervals = new ArrayList<>();
+ intervals.add(INTERVAL_JAN_1_4);
+ intervals.add(INTERVAL_JAN_6_8);
+
+ rows = new ArrayList<>();
+
+ Sequence<Row> seq = Sequences.simple(rows);
+ RowBucketIterable rbi = new RowBucketIterable(seq, intervals, ONE_DAY);
+ Iterator<RowBucket> iter = rbi.iterator();
+
+ assertTrue(iter.hasNext());
+ RowBucket actual = iter.next();
+ assertEquals(Collections.emptyList(), actual.getRows());
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactoryTest.java
new file mode 100644
index 0000000..98104e6
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerFactoryTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class BaseAveragerFactoryTest
+{
+
+ private AveragerFactory<Long, Long> fac;
+
+ @Before
+ public void setup()
+ {
+ fac = new BaseAveragerFactory<Long, Long>("test", 5, "field", 1)
+ {
+ @Override
+ public Averager<Long> createAverager()
+ {
+ return null;
+ }
+
+ @Override
+ public Comparator<Long> getComparator()
+ {
+ return null;
+ }
+ };
+ }
+
+ @Test
+ public void testGetDependentFields()
+ {
+ List<String> dependentFields = fac.getDependentFields();
+ assertEquals(1, dependentFields.size());
+ assertEquals("field", dependentFields.get(0));
+ }
+
+ @Test
+ public void testFinalization()
+ {
+ Long input = Long.valueOf(5L);
+ assertEquals(input, fac.finalizeComputation(input));
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerTest.java
new file mode 100644
index 0000000..c6e960d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/BaseAveragerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test class for BaseAverager
+ */
+public class BaseAveragerTest
+{
+
+ public static class TestAverager extends BaseAverager<Integer, Integer>
+ {
+ public TestAverager(Class<Integer> clazz, int b, String name, String field, int cycleSize)
+ {
+ super(clazz, b, name, field, cycleSize);
+ }
+
+ @Override
+ protected Integer computeResult()
+ {
+ return 1;
+ }
+ }
+
+ @Test
+ public void testBaseAverager()
+ {
+ BaseAverager<Integer, Integer> avg = new TestAverager(Integer.class, 5, "test", "field", 1);
+
+ assertEquals("test", avg.getName());
+ assertEquals(5, avg.getNumBuckets());
+ assertEquals(5, avg.getBuckets().length);
+ assertTrue(avg.getBuckets().getClass().isArray());
+ }
+
+ @Test
+ public void testAddElement()
+ {
+ BaseAverager<Integer, Integer> avg = new TestAverager(Integer.class, 3, "test", "field", 1);
+ Object[] buckets = avg.getBuckets();
+
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+ assertEquals(Integer.valueOf(1), buckets[0]);
+ assertNull(buckets[1]);
+ assertNull(buckets[2]);
+
+ avg.addElement(Collections.singletonMap("field", 2), Collections.emptyMap());
+ assertEquals(Integer.valueOf(1), buckets[0]);
+ assertEquals(Integer.valueOf(2), buckets[1]);
+ assertNull(buckets[2]);
+
+ avg.addElement(Collections.singletonMap("field", 3), Collections.emptyMap());
+ assertEquals(Integer.valueOf(1), buckets[0]);
+ assertEquals(Integer.valueOf(2), buckets[1]);
+ assertEquals(Integer.valueOf(3), buckets[2]);
+
+ avg.addElement(Collections.singletonMap("field", 4), Collections.emptyMap());
+ assertEquals(Integer.valueOf(4), buckets[0]);
+ assertEquals(Integer.valueOf(2), buckets[1]);
+ assertEquals(Integer.valueOf(3), buckets[2]);
+ }
+
+ @Test
+ public void testSkip()
+ {
+ BaseAverager<Integer, Integer> avg = new TestAverager(Integer.class, 3, "test", "field", 1);
+ Object[] buckets = avg.getBuckets();
+
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+
+ assertEquals(Integer.valueOf(1), buckets[0]);
+ assertEquals(Integer.valueOf(1), buckets[1]);
+ assertEquals(Integer.valueOf(1), buckets[2]);
+
+ avg.skip();
+ assertNull(buckets[0]);
+ assertNotNull(buckets[1]);
+ assertNotNull(buckets[2]);
+
+ avg.skip();
+ assertNull(buckets[0]);
+ assertNull(buckets[1]);
+ assertNotNull(buckets[2]);
+
+ avg.skip();
+ assertNull(buckets[0]);
+ assertNull(buckets[1]);
+ assertNull(buckets[2]);
+
+ // poke some test data into the array
+ buckets[0] = Integer.valueOf(1);
+
+ avg.skip();
+ assertNull(buckets[0]);
+ assertNull(buckets[1]);
+ assertNull(buckets[2]);
+ }
+
+ @Test
+ public void testHasData()
+ {
+ BaseAverager<Integer, Integer> avg = new TestAverager(Integer.class, 3, "test", "field", 1);
+
+ assertFalse(avg.hasData());
+
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+ assertTrue(avg.hasData());
+
+ avg.skip();
+ avg.skip();
+ avg.skip();
+
+ assertFalse(avg.hasData());
+ }
+
+ @Test
+ public void testGetResult()
+ {
+ BaseAverager<Integer, Integer> avg = new TestAverager(Integer.class, 3, "test", "field", 1);
+
+ assertNull(avg.getResult());
+
+ avg.addElement(Collections.singletonMap("field", 1), Collections.emptyMap());
+ assertEquals(Integer.valueOf(1), avg.getResult());
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactoryTest.java
new file mode 100644
index 0000000..773cae4
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerFactoryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+
+public class DoubleMaxAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new DoubleMaxAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(DoubleMaxAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerTest.java
new file mode 100644
index 0000000..e1ba10f
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMaxAveragerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoubleMaxAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new DoubleMaxAverager(3, "test", "field", 1);
+
+ assertEquals(Double.NEGATIVE_INFINITY, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", -1.1e100), new HashMap<>());
+ assertEquals(-1.1e100, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ assertEquals(1.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", new Integer(1)), new HashMap<>());
+ assertEquals(1.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 5.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ assertEquals(5.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ assertEquals(3.0, avg.computeResult(), 0.0);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactoryTest.java
new file mode 100644
index 0000000..68d9b67
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class DoubleMeanAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new DoubleMeanAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(DoubleMeanAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerTest.java
new file mode 100644
index 0000000..0d5f2c7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoubleMeanAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new DoubleMeanAverager(3, "test", "field", 1);
+
+ assertEquals(0.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(1.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", new Integer(0)), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ assertEquals(4.0 / 3, avg.computeResult(), 0.0);
+
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerWithPeriodTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerWithPeriodTest.java
new file mode 100644
index 0000000..8cde307
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanAveragerWithPeriodTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoubleMeanAveragerWithPeriodTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> averager = new DoubleMeanAverager(14, "test", "field", 7);
+
+ averager.addElement(Collections.singletonMap("field", 7.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 4.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 5.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 6.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 7.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 4.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 5.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 6.0), new HashMap<>());
+
+ assertEquals(7, averager.computeResult(), 0.0); // (7+7)/2
+
+ averager.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(1, averager.computeResult(), 0.0); // (1+1)/2
+
+ BaseAverager<Number, Double> averager1 = new DoubleMeanAverager(14, "test", "field", 3);
+
+ averager1.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ averager1.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+
+ assertEquals(1, averager1.computeResult(), 0.0); // (1+1+1+1+1)/5
+
+ assertEquals(2, averager1.computeResult(), 0.0); // (2+2+2+2+2)/5
+
+ assertEquals(13.0 / 5, averager1.computeResult(), 0.0); // (3+3+3+3+1)/5
+
+ }
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactoryTest.java
new file mode 100644
index 0000000..9359fc2
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class DoubleMeanNoNullAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new DoubleMeanNoNullAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(DoubleMeanNoNullAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerTest.java
new file mode 100644
index 0000000..6d946e4
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMeanNoNullAveragerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoubleMeanNoNullAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new DoubleMeanNoNullAverager(3, "test", "field", 1);
+
+ assertEquals(Double.NaN, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(3.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(3.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", new Integer(0)), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ // testing cycleSize functionality
+ BaseAverager<Number, Double> averager = new DoubleMeanNoNullAverager(14, "test", "field", 7);
+
+ averager.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ assertEquals(2.0, averager.computeResult(), 0.0);
+
+ averager.addElement(Collections.singletonMap("field", 4.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 5.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 6.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 7.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 8.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 9.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", null), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 11.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 12.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 13.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 14.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 15.0), new HashMap<>());
+ averager.addElement(Collections.singletonMap("field", 16.0), new HashMap<>());
+
+ assertEquals(7.5, averager.computeResult(), 0.0);
+
+ averager.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(8.5, averager.computeResult(), 0.0);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactoryTest.java
new file mode 100644
index 0000000..ef2bb6f
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class DoubleMinAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new DoubleMinAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(DoubleMinAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerTest.java
new file mode 100644
index 0000000..02fd2c2
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/DoubleMinAveragerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoubleMinAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new DoubleMinAverager(3, "test", "field", 1);
+
+ assertEquals(Double.POSITIVE_INFINITY, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", -1.1e100), new HashMap<>());
+ assertEquals(-1.1e100, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 1.0), new HashMap<>());
+ assertEquals(-1.1e100, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", new Integer(1)), new HashMap<>());
+ assertEquals(-1.1e100, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 5.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2.0), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 3.0), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ avg.skip();
+ assertEquals(3.0, avg.computeResult(), 0.0);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactoryTest.java
new file mode 100644
index 0000000..7601a5d
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class LongMaxAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new LongMaxAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(LongMaxAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerTest.java
new file mode 100644
index 0000000..c799a1a
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMaxAveragerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongMaxAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Long> avg = new LongMaxAverager(3, "test", "field", 1);
+
+ assertEquals(Long.MIN_VALUE, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", -1000000L), new HashMap<>());
+ assertEquals(-1000000, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", 1L), new HashMap<>());
+ assertEquals(1, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", new Integer(1)), new HashMap<>());
+ assertEquals(1, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", 5L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ assertEquals(5, (long) avg.computeResult());
+
+ avg.skip();
+ assertEquals(3, (long) avg.computeResult());
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactoryTest.java
new file mode 100644
index 0000000..763d9c7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class LongMeanAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new LongMeanAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(LongMeanAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerTest.java
new file mode 100644
index 0000000..cb037a2
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanAveragerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongMeanAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new LongMeanAverager(3, "test", "field", 1);
+
+ assertEquals(0.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ assertEquals(1.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3), new HashMap<>());
+ assertEquals(3.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ assertEquals(4.0 / 3, avg.computeResult(), 0.0);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactoryTest.java
new file mode 100644
index 0000000..f3c4dac
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class LongMeanNoNullAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new LongMeanNoNullAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(LongMeanNoNullAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerTest.java
new file mode 100644
index 0000000..0681db7
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMeanNoNullAveragerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongMeanNoNullAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Double> avg = new LongMeanNoNullAverager(3, "test", "field", 1);
+
+ assertEquals(Double.NaN, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ assertEquals(3.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ assertEquals(3.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", new Integer(0)), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ assertEquals(2.0, avg.computeResult(), 0.0);
+
+ avg.skip();
+ assertEquals(2.0, avg.computeResult(), 0.0);
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactoryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactoryTest.java
new file mode 100644
index 0000000..067f6b2
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class LongMinAveragerFactoryTest
+{
+
+ @Test
+ public void testCreateAverager()
+ {
+ AveragerFactory<?, ?> fac = new LongMinAveragerFactory("test", 5, 1, "field");
+ assertThat(fac.createAverager(), instanceOf(LongMinAverager.class));
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerTest.java
new file mode 100644
index 0000000..4cbcdae
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/averagers/LongMinAveragerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.averagers;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongMinAveragerTest
+{
+
+ @Test
+ public void testComputeResult()
+ {
+ BaseAverager<Number, Long> avg = new LongMinAverager(3, "test", "field", 1);
+
+ assertEquals(Long.MAX_VALUE, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", -10000L), new HashMap<>());
+ assertEquals(-10000, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", 1L), new HashMap<>());
+ assertEquals(-10000, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", new Integer(1000)), new HashMap<>());
+ assertEquals(-10000, (long) avg.computeResult());
+
+ avg.addElement(Collections.singletonMap("field", 5L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 2L), new HashMap<>());
+ avg.addElement(Collections.singletonMap("field", 3L), new HashMap<>());
+ assertEquals(2, (long) avg.computeResult());
+
+ avg.skip();
+ avg.skip();
+ assertEquals(3, (long) avg.computeResult());
+ }
+
+}
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/test/TestConfig.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/test/TestConfig.java
new file mode 100644
index 0000000..792394e
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/test/TestConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage.test;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+
+/**
+ * Configuration for a unit test.
+ */
+public class TestConfig
+{
+ public ObjectNode query;
+ public ArrayNode expectedOutput;
+ public Map<String, ArrayNode> intermediateResults;
+}
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage.yaml
new file mode 100644
index 0000000..a3d2d16
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage.yaml
@@ -0,0 +1,57 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ timespent_secs: 120.0
+ timeSpent: 2.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage2.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage2.yaml
new file mode 100644
index 0000000..a3d2d16
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicGroupByMovingAverage2.yaml
@@ -0,0 +1,57 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ timespent_secs: 120.0
+ timeSpent: 2.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicTimeseriesMovingAverage.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicTimeseriesMovingAverage.yaml
new file mode 100644
index 0000000..1458ed8
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/basicTimeseriesMovingAverage.yaml
@@ -0,0 +1,51 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions: []
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+intermediateResults:
+ timeseries:
+ - timestamp: 2017-01-01T00:00Z
+ result:
+ timespent_secs: 120.0
+ timeSpent: 2.0
+ - timestamp: 2017-01-02T00:00Z
+ result:
+ timespent_secs: 240.0
+ timeSpent: 4.0
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/missingGroupByValues.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/missingGroupByValues.yaml
new file mode 100644
index 0000000..c4ab5a4
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/missingGroupByValues.yaml
@@ -0,0 +1,78 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ - name: someSum
+ fieldName: someSum_field
+ type: doubleSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+ someSum: 3.0
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ timespent_secs: 0
+ timeSpent: 0.0
+ trailing7DayAvgTimeSpent: 1.0
+ someSum: 0.0
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ timespent_secs: 120
+ timeSpent: 2.0
+ someSum: 5.0
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: f
+ timespent_secs: 120
+ timeSpent: 2.0
+ someSum: 2.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240
+ timeSpent: 4.0
+ someSum: 3.0
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersAsc.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersAsc.yaml
new file mode 100644
index 0000000..ba685ff
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersAsc.yaml
@@ -0,0 +1,81 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+ limitSpec:
+ type: default
+ columns:
+ - dimension: trailing7DayAvgTimeSpent
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ timespent_secs: 480.0
+ timeSpent: 8.0
+ trailing7DayAvgTimeSpent: 6.0
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ timespent_secs: 120.0
+ timeSpent: 2.0
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: f
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ timespent_secs: 480.0
+ timeSpent: 8.0
+
\ No newline at end of file
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersDesc.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersDesc.yaml
new file mode 100644
index 0000000..59f75bc
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingAveragersDesc.yaml
@@ -0,0 +1,82 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_pl_dt_os
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 2
+ name: trailing7DayAvgTimeSpent
+ fieldName: timeSpent
+ type: doubleMean
+ aggregations:
+ - name: timespent_secs
+ fieldName: timespent
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: timeSpent
+ fn: /
+ fields:
+ - type: fieldAccess
+ fieldName: timespent_secs
+ - type: constant
+ name: seconds_per_minute
+ value: 60.0
+ postAveragers: [
+ ]
+ limitSpec:
+ type: default
+ columns:
+ - dimension: trailing7DayAvgTimeSpent
+ direction: DESC
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ timespent_secs: 480.0
+ timeSpent: 8.0
+ trailing7DayAvgTimeSpent: 6.0
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ trailing7DayAvgTimeSpent: 3.0
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ timespent_secs: 120.0
+ timeSpent: 2.0
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: f
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ timespent_secs: 240.0
+ timeSpent: 4.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ timespent_secs: 480.0
+ timeSpent: 8.0
+
\ No newline at end of file
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAndMovingAvgMetric.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAndMovingAvgMetric.yaml
new file mode 100644
index 0000000..c7d7ddc
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAndMovingAvgMetric.yaml
@@ -0,0 +1,84 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_uc_ud
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 7
+ name: trailing7DayAvgTotalPageViews
+ fieldName: totalPageViews
+ type: doubleMean
+ aggregations:
+ - name: addPageViews
+ fieldName: additive_page_views
+ type: longSum
+ - name: pageViews
+ fieldName: other_page_views
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: totalPageViews
+ fn: +
+ fields:
+ - type: fieldAccess
+ fieldName: addPageViews
+ - type: fieldAccess
+ fieldName: pageViews
+ postAveragers: [
+ ]
+ limitSpec:
+ type: default
+ columns:
+ - dimension: addPageViews
+ direction: DESC
+ dimension: trailing7DayAvgTotalPageViews
+ direction: DESC
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ addPageViews: 1.0
+ pageViews: 2.0
+ totalPageViews: 3.0
+ trailing7DayAvgTotalPageViews: 3.0
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ addPageViews: 0
+ pageViews: 0
+ totalPageViews: 0.0
+ trailing7DayAvgTotalPageViews: 2.142857142857143
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ addPageViews: 5.0
+ pageViews: 10.0
+ totalPageViews: 15.0
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: f
+ addPageViews: 6.0
+ pageViews: 12.0
+ totalPageViews: 18.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ addPageViews: 1.0
+ pageViews: 2.0
+ totalPageViews: 3.0
\ No newline at end of file
diff --git a/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAvgMetric.yaml b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAvgMetric.yaml
new file mode 100644
index 0000000..89ae941
--- /dev/null
+++ b/extensions-contrib/moving-average-query/src/test/resources/queryTests/sortingWithNonMovingAvgMetric.yaml
@@ -0,0 +1,82 @@
+query:
+ queryType: movingAverage
+ dataSource:
+ type: table
+ name: slice_pf_us_uc_ud
+ context: {
+ }
+ granularity:
+ type: period
+ period: P1D
+ intervals:
+ - 2017-01-02T00:00Z/2017-01-03T00:00Z
+ dimensions:
+ - gender
+ averagers:
+ - buckets: 7
+ name: trailing7DayAvgTotalPageViews
+ fieldName: totalPageViews
+ type: doubleMean
+ aggregations:
+ - name: addPageViews
+ fieldName: additive_page_views
+ type: longSum
+ - name: pageViews
+ fieldName: other_page_views
+ type: longSum
+ postAggregations:
+ - type: arithmetic
+ name: totalPageViews
+ fn: +
+ fields:
+ - type: fieldAccess
+ fieldName: addPageViews
+ - type: fieldAccess
+ fieldName: pageViews
+ postAveragers: [
+ ]
+ limitSpec:
+ type: default
+ columns:
+ - dimension: addPageViews
+ direction: DESC
+expectedOutput:
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ addPageViews: 1.0
+ pageViews: 2.0
+ totalPageViews: 3.0
+ trailing7DayAvgTotalPageViews: 3.0
+- version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: m
+ addPageViews: 0
+ pageViews: 0
+ totalPageViews: 0.0
+ trailing7DayAvgTotalPageViews: 2.142857142857143
+intermediateResults:
+ groupBy:
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: m
+ addPageViews: 5.0
+ pageViews: 10.0
+ totalPageViews: 15.0
+ - version: v1
+ timestamp: 2017-01-01T00:00Z
+ event:
+ gender: f
+ addPageViews: 6.0
+ pageViews: 12.0
+ totalPageViews: 18.0
+ - version: v1
+ timestamp: 2017-01-02T00:00Z
+ event:
+ gender: f
+ addPageViews: 1.0
+ pageViews: 2.0
+ totalPageViews: 3.0
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4f752e1..d52dd1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,6 +173,7 @@
<module>extensions-contrib/materialized-view-maintenance</module>
<module>extensions-contrib/materialized-view-selection</module>
<module>extensions-contrib/momentsketch</module>
+ <module>extensions-contrib/moving-average-query</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
@@ -1580,4 +1581,4 @@
</build>
</profile>
</profiles>
-</project>
+</project>
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org