You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2022/09/06 07:07:10 UTC

[druid] branch master updated: compressed big decimal - module (#10705)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9aef225d compressed big decimal - module  (#10705)
3d9aef225d is described below

commit 3d9aef225d302c663df8fd1d832b80108ea54047
Author: senthilkv <se...@verizonmedia.com>
AuthorDate: Tue Sep 6 03:06:57 2022 -0400

    compressed big decimal - module  (#10705)
    
    Compressed Big Decimal is an extension which provides support for
    Mutable big decimal value that can be used to accumulate values
    without losing precision or reallocating memory. This type helps in
    absolute precision arithmetic on large numbers in applications,
    where greater level of accuracy is required, such as financial
    applications, currency based transactions. This helps avoid rounding
    issues where in potentially large amount of money can be lost.
    
    Accumulation requires that the two numbers have the same scale,
    but does not require that they are of the same size. If the value
    being accumulated has a larger underlying array than this value
    (the result), then the higher order bits are dropped, similar to what
    happens when adding a long to an int and storing the result in an
    int. A compressed big decimal that holds its data with an embedded
    array.
    
    Compressed big decimal is an absolute number based complex type
    based on big decimal in Java. This supports all the functionalities
    supported by Java Big Decimal. Java Big Decimal is not mutable in
    order to avoid big garbage collection issues. Compressed big decimal
    is needed to mutate the value in the accumulator.
---
 .../extensions-contrib/compressed-big-decimal.md   | 241 +++++++++++
 docs/development/extensions.md                     |   1 +
 extensions-contrib/compressed-bigdecimal/pom.xml   | 130 ++++++
 .../ArrayCompressedBigDecimal.java                 | 180 ++++++++
 .../ByteBufferCompressedBigDecimal.java            | 121 ++++++
 .../compressedbigdecimal/CompressedBigDecimal.java | 329 +++++++++++++++
 .../CompressedBigDecimalAggregateCombiner.java     | 100 +++++
 .../CompressedBigDecimalAggregator.java            | 101 +++++
 .../CompressedBigDecimalAggregatorFactory.java     | 302 +++++++++++++
 .../CompressedBigDecimalBufferAggregator.java      | 115 +++++
 .../CompressedBigDecimalColumn.java                | 134 ++++++
 .../CompressedBigDecimalColumnPartSupplier.java    |  89 ++++
 .../CompressedBigDecimalJsonSerializer.java        |  45 ++
 .../CompressedBigDecimalLongColumnSerializer.java  | 122 ++++++
 .../CompressedBigDecimalMetricSerde.java           | 111 +++++
 .../CompressedBigDecimalModule.java                |  60 +++
 .../CompressedBigDecimalObjectStrategy.java        |  94 +++++
 .../compressedbigdecimal/ObjBiIntConsumer.java     |  40 ++
 .../apache/druid/compressedbigdecimal/Utils.java   | 176 ++++++++
 .../org.apache.druid.initialization.DruidModule    |  16 +
 .../AggregatorCombinerFactoryTest.java             | 181 ++++++++
 .../ArrayCompressedBigDecimalTest.java             | 465 +++++++++++++++++++++
 .../CompressedBigDecimalAggregatorGroupByTest.java | 150 +++++++
 ...mpressedBigDecimalAggregatorTimeseriesTest.java | 167 ++++++++
 .../src/test/resources/bd_test_aggregators.json    |   9 +
 .../src/test/resources/bd_test_data.csv            |   6 +
 .../src/test/resources/bd_test_data_parser.json    |  20 +
 .../src/test/resources/bd_test_groupby_query.json  |  19 +
 .../test/resources/bd_test_timeseries_query.json   |  25 ++
 .../src/test/resources/bd_test_zero_data.csv       |   1 +
 pom.xml                                            |   1 +
 website/.spelling                                  |  10 +
 32 files changed, 3561 insertions(+)

diff --git a/docs/development/extensions-contrib/compressed-big-decimal.md b/docs/development/extensions-contrib/compressed-big-decimal.md
new file mode 100644
index 0000000000..dc3c9651fe
--- /dev/null
+++ b/docs/development/extensions-contrib/compressed-big-decimal.md
@@ -0,0 +1,241 @@
+---
+id: compressed-big-decimal
+title: "Compressed Big Decimal"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+## Overview
+**Compressed Big Decimal** is an extension which provides support for Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory. This type helps in absolute precision arithmetic on large numbers in applications, where  greater level of accuracy is required, such as financial applications, currency based transactions.  This helps avoid rounding issues where in potentially large amount of money can be lost.
+
+Accumulation requires that the two numbers have the same scale, but does not require that they are  of the same size. If the value being accumulated has a larger underlying array than this value (the result), then the higher order bits are dropped, similar to  what happens when adding a long to an int and storing the result in an int. A compressed big decimal that holds its data with an embedded array.
+
+Compressed big decimal is an absolute number based complex type based on big decimal in Java. This supports all the functionalities supported by Java Big Decimal.  Java Big Decimal is not mutable in order to avoid big garbage collection issues.   Compressed big decimal is needed to mutate the value in the accumulator.
+
+#### Main enhancements provided by this extension:
+1. Functionality: Mutating Big decimal type with greater precision 
+2. Accuracy: Provides greater level of accuracy in decimal arithmetic
+
+## Operations
+To use this extension, make sure to [load](../../development/extensions.md#loading-extensions) `compressed-big-decimal` to your config file.
+
+## Configuration
+There are currently no configuration properties specific to Compressed Big Decimal
+
+## Limitations
+* Compressed Big Decimal does not provide correct result when the value being accumulated has a larger underlying array than this value (the result), then the higher order bits are dropped, similar to  what happens when adding a long to an int and storing the result in an int.
+
+
+### Ingestion Spec:
+* Most properties in the Ingest spec derived from  [Ingestion Spec](../../ingestion/index.md) / [Data Formats](../../ingestion/data-formats.md)
+
+
+|property|description|required?|
+|--------|-----------|---------|
+|metricsSpec|Metrics Specification, In metrics specification while specifying metrics details such as name, type should be specified as compressedBigDecimal|Yes|
+
+### Query spec:
+* Most properties in the query spec derived from  [groupBy query](../../querying/groupbyquery.md) / [timeseries](../../querying/timeseriesquery.md), see documentation for these query types.
+
+|property|description|required?|
+|--------|-----------|---------|
+|queryType|This String should always be either "groupBy" OR "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query.|yes|
+|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../../querying/datasource.md) for more information.|yes|
+|dimensions|A JSON list of [DimensionSpec](../../querying/dimensionspecs.md) (Notice that property is optional)|no|
+|limitSpec|See [LimitSpec](../../querying/limitspec.md)|no|
+|having|See [Having](../../querying/having.md)|no|
+|granularity|A period granularity; See [Period Granularities](../../querying/granularities.html#period-granularities)|yes|
+|filter|See [Filters](../../querying/filters.md)|no|
+|aggregations|Aggregations forms the input to Averagers; See [Aggregations](../../querying/aggregations.md). The Aggregations must specify type, scale and size as follows for compressedBigDecimal Type ```"aggregations": [{"type": "compressedBigDecimal","name": "..","fieldName": "..","scale": [Numeric],"size": [Numeric]}```.  Please refer query example in Examples section.  |Yes|
+|postAggregations|Supports only aggregations as input; See [Post Aggregations](../../querying/post-aggregations.md)|no|
+|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
+|context|An additional JSON Object which can be used to specify certain flags.|no|
+
+## Examples
+
+Consider the data as
+
+|Date|Item|SaleAmount|
+|--------|-----------|---------|
+
+```
+20201208,ItemA,0.0
+20201208,ItemB,10.000000000
+20201208,ItemA,-1.000000000
+20201208,ItemC,9999999999.000000000
+20201208,ItemB,5000000000.000000005
+20201208,ItemA,2.0
+20201208,ItemD,0.0
+```
+
+IngestionSpec syntax:
+
+```json
+{
+	"type": "index_parallel",
+	"spec": {
+		"dataSchema": {
+			"dataSource": "invoices",
+			"timestampSpec": {
+				"column": "timestamp",
+				"format": "yyyyMMdd"
+			},
+			"dimensionsSpec": {
+				"dimensions": [{
+					"type": "string",
+					"name": "itemName"
+				}]
+			},
+			"metricsSpec": [{
+				"name": "saleAmount",
+				"type": *"compressedBigDecimal"*,
+				"fieldName": "saleAmount"
+			}],
+			"transformSpec": {
+				"filter": null,
+				"transforms": []
+			},
+			"granularitySpec": {
+				"type": "uniform",
+				"rollup": false,
+				"segmentGranularity": "DAY",
+				"queryGranularity": "none",
+				"intervals": ["2020-12-08/2020-12-09"]
+			}
+		},
+		"ioConfig": {
+			"type": "index_parallel",
+			"inputSource": {
+				"type": "local",
+				"baseDir": "/home/user/sales/data/staging/invoice-data",
+				"filter": "invoice-001.20201208.txt"
+			},
+			"inputFormat": {
+				"type": "tsv",
+                                "delimiter": ",",
+                                "skipHeaderRows": 0,
+				"columns": [
+						"timestamp",
+						"itemName",
+						"saleAmount"
+					]
+			}
+		},
+		"tuningConfig": {
+			"type": "index_parallel"
+		}
+	}
+}
+```
+### Group By Query  example
+
+Calculating sales groupBy all.
+
+Query syntax:
+
+```json
+{
+    "queryType": "groupBy",
+    "dataSource": "invoices",
+    "granularity": "ALL",
+    "dimensions": [
+    ],
+    "aggregations": [
+        {
+            "type": "compressedBigDecimal",
+            "name": "saleAmount",
+            "fieldName": "saleAmount",
+            "scale": 9,
+            "size": 3
+
+        }
+    ],
+    "intervals": [
+        "2020-01-08T00:00:00.000Z/P1D"
+    ]
+}
+```
+
+Result:
+
+```json
+[ {
+  "version" : "v1",
+  "timestamp" : "2020-12-08T00:00:00.000Z",
+  "event" : {
+    "revenue" : 15000000010.000000005
+  }
+} ]
+```
+
+Had you used *doubleSum* instead of compressedBigDecimal the result would be 
+
+```json
+[ {
+  "timestamp" : "2020-12-08T00:00:00.000Z",
+  "result" : {
+    "revenue" : 1.500000001E10
+  }
+} ]
+```
+As shown above the precision is lost and could lead to loss in money.
+
+### TimeSeries Query Example 
+
+Query syntax:
+
+```json
+{
+    "queryType": "timeseries",
+    "dataSource": "invoices",
+    "granularity": "ALL",
+    "aggregations": [
+        {
+            "type": "compressedBigDecimal",
+            "name": "revenue",
+            "fieldName": "revenue",
+            "scale": 9,
+            "size": 3
+        }
+    ],
+    "filter": {
+        "type": "not",
+        "field": {
+            "type": "selector",
+            "dimension": "itemName",
+            "value": "ItemD"
+        }
+    },
+    "intervals": [
+        "2020-12-08T00:00:00.000Z/P1D"
+    ]
+}
+```
+
+Result:
+
+```json
+[ {
+  "timestamp" : "2020-12-08T00:00:00.000Z",
+  "result" : {
+    "revenue" : 15000000010.000000005
+  }
+} ]
+```
diff --git a/docs/development/extensions.md b/docs/development/extensions.md
index 49f61c8d74..ef2e69686b 100644
--- a/docs/development/extensions.md
+++ b/docs/development/extensions.md
@@ -79,6 +79,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
 |ambari-metrics-emitter|Ambari Metrics Emitter |[link](../development/extensions-contrib/ambari-metrics-emitter.md)|
 |druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.md)|
 |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.md)|
+|druid-compressed-bigdecimal|Compressed Big Decimal Type | [link](../development/extensions-contrib/compressed-big-decimal.md)|
 |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.md)|
 |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.md)|
 |druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.md)|
diff --git a/extensions-contrib/compressed-bigdecimal/pom.xml b/extensions-contrib/compressed-bigdecimal/pom.xml
new file mode 100644
index 0000000000..1923f2c884
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.druid</groupId>
+        <artifactId>druid</artifactId>
+        <version>25.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.druid.extensions.contrib</groupId>
+    <artifactId>druid-compressed-bigdecimal</artifactId>
+    <name>druid-compressed-bigdecimal</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-server</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Tests -->
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <classifier>tests</classifier>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>java-hamcrest</artifactId>
+            <version>2.0.0.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>2.0.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+            <version>4.1.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>2.10.5</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>16.0.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.10.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.10.2</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java
new file mode 100644
index 0000000000..5f514b5dd4
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * A compressed big decimal that holds its data with an embedded array.
+ */
+@SuppressWarnings("serial")
+public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompressedBigDecimal>
+{
+
+  private static final int BYTE_MASK = 0xff;
+
+  private final int[] array;
+
+  /**
+   * Construct an AccumulatingBigDecimal using the referenced initial
+   * value and scale.
+   *
+   * @param initialVal initial value
+   * @param scale      the scale to use
+   */
+  public ArrayCompressedBigDecimal(long initialVal, int scale)
+  {
+    super(scale);
+    this.array = new int[2];
+    this.array[0] = (int) initialVal;
+    this.array[1] = (int) (initialVal >>> 32);
+  }
+
+  /**
+   * Construct an CompressedBigDecimal from an equivalent {@link BigDecimal}.
+   * The passed value's unscaled number is converted into byte array and then it
+   * compresses the unscaled number to array
+   *
+   * @param initialVal The BigDecimal value.
+   */
+  public ArrayCompressedBigDecimal(BigDecimal initialVal)
+  {
+    super(initialVal.scale());
+    BigInteger unscaled = initialVal.unscaledValue();
+    byte[] bytes = unscaled.toByteArray();
+    int arrayLen = (bytes.length + 3) / 4;
+
+    this.array = new int[arrayLen];
+
+    if (initialVal.signum() == 0) {
+      // initial value is 0. Nothing to copy
+      return;
+    }
+
+    int bytesIdx = bytes.length;
+    for (int ii = 0; ii < arrayLen; ++ii) {
+      this.array[ii] =
+          (BYTE_MASK & bytes[--bytesIdx]) |
+              (bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 8 |
+              (bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 16 |
+              (bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 24;
+    }
+  }
+
+  /**
+   * Construct an CompressedBigDecimal that is a copy of the passed in value.
+   *
+   * @param initVal the initial value
+   */
+  public ArrayCompressedBigDecimal(CompressedBigDecimal<?> initVal)
+  {
+    super(initVal.getScale());
+    this.array = new int[initVal.getArraySize()];
+    for (int ii = 0; ii < initVal.getArraySize(); ++ii) {
+      this.array[ii] = initVal.getArrayEntry(ii);
+    }
+  }
+
+  /**
+   * Private constructor to build an CompressedBigDecimal that wraps
+   * a passed in array of ints. The constructor takes ownership of the
+   * array and its contents may be modified.
+   *
+   * @param array the initial magnitude
+   * @param scale the scale
+   */
+  private ArrayCompressedBigDecimal(int[] array, int scale)
+  {
+    super(scale);
+    this.array = array;
+  }
+
+  /**
+   * Static method to construct an CompressedBigDecimal that wraps an
+   * underlying array.
+   *
+   * @param array The array to wrap
+   * @param scale The scale to use
+   * @return An CompressedBigDecimal
+   */
+  public static ArrayCompressedBigDecimal wrap(int[] array, int scale)
+  {
+    return new ArrayCompressedBigDecimal(array, scale);
+  }
+
+  /**
+   * Allocate a new CompressedBigDecimal with the specified size and scale.
+   *
+   * @param size  size of the int array used for calculations
+   * @param scale scale of the number
+   * @return CompressedBigDecimal
+   */
+  public static ArrayCompressedBigDecimal allocate(int size, int scale)
+  {
+    int[] arr = new int[size];
+    return new ArrayCompressedBigDecimal(arr, scale);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
+   */
+  @Override
+  public int getArraySize()
+  {
+    return array.length;
+  }
+
+  /**
+   * Package private access to internal array.
+   *
+   * @return the array
+   */
+  int[] getArray()
+  {
+    return array;
+  }
+
+  /**
+   * Package private access to entry in internal array.
+   *
+   * @param idx index to retrieve
+   * @return the entry
+   */
+  @Override
+  protected int getArrayEntry(int idx)
+  {
+    return array[idx];
+  }
+
+  /**
+   * Package private access to set entry in internal array.
+   *
+   * @param idx index to retrieve
+   * @param val value to set
+   */
+  @Override
+  protected void setArrayEntry(int idx, int val)
+  {
+    array[idx] = val;
+  }
+
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java
new file mode 100644
index 0000000000..542c9f92f0
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A compressed big decimal that holds its data with an embedded array.
+ */
+@SuppressWarnings("serial")
+public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBufferCompressedBigDecimal>
+{
+
+  private final ByteBuffer buf;
+  private final int position;
+  private final int size;
+
+  /**
+   * Construct an AccumulatingBigDecimal using the referenced initial
+   * value and scale.
+   *
+   * @param buf      The byte buffer to wrap
+   * @param position the position in the byte buffer where the data should be stored
+   * @param size     the size (in ints) of the byte buffer
+   * @param scale    the scale to use
+   */
+  public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, int size, int scale)
+  {
+    super(scale);
+    this.buf = buf;
+    this.position = position;
+    this.size = size;
+  }
+
+  /**
+   * Construct a CompressedBigDecimal that uses a ByteBuffer for its storage and whose
+   * initial value is copied from the specified CompressedBigDecimal.
+   *
+   * @param buf      the ByteBuffer to use for storage
+   * @param position the position in the ByteBuffer
+   * @param val      initial value
+   */
+  public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal<?> val)
+  {
+    super(val.getScale());
+    this.buf = buf;
+    this.position = position;
+    this.size = val.getArraySize();
+
+    copyToBuffer(buf, position, size, val);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
+   */
+  @Override
+  public int getArraySize()
+  {
+    return size;
+  }
+
+  /**
+   * Package private access to entry in internal array.
+   *
+   * @param idx index to retrieve
+   * @return the entry
+   */
+  @Override
+  protected int getArrayEntry(int idx)
+  {
+    return buf.getInt(position + idx * Integer.BYTES);
+  }
+
+  /**
+   * Package private access to set entry in internal array.
+   *
+   * @param idx index to retrieve
+   * @param val value to set
+   */
+  @Override
+  protected void setArrayEntry(int idx, int val)
+  {
+    buf.putInt(position + idx * Integer.BYTES, val);
+  }
+
+  /**
+   * Copy a compressed big decimal into a Bytebuffer in a format understood by this class.
+   *
+   * @param buf      The buffer
+   * @param position The position in the buffer to place the value
+   * @param size     The space (in number of ints) allocated for the value
+   * @param val      THe value to copy
+   */
+  public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal<?> val)
+  {
+    if (val.getArraySize() > size) {
+      throw new IllegalArgumentException("Right hand side too big to fit in the result value");
+    }
+    for (int ii = 0; ii < size; ++ii) {
+      buf.putInt(position + ii * Integer.BYTES, val.getArrayEntry(ii));
+    }
+  }
+
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java
new file mode 100644
index 0000000000..12165171df
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.function.ToIntBiFunction;
+
+/**
+ * Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory. 
+ * This helps in revenue based calculations
+ *
+ * @param <T> Type of actual derived class that contains the underlying data
+ */
+@SuppressWarnings("serial")
+public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> extends Number
+    implements Comparable<CompressedBigDecimal<T>>
+{
+
+  private static final long INT_MASK = 0x00000000ffffffffL;
+
+  private final int scale;
+
+  /**
+   * Construct an CompressedBigDecimal by specifying the scale, array size, and a function to get
+   * and put array values.
+   *
+   * @param scale scale of the compressed big decimal
+   */
+  protected CompressedBigDecimal(int scale)
+  {
+    this.scale = scale;
+  }
+
+
+  /**
+   * Accumulate (add) the passed in value into the current total. This
+   * modifies the value of the current object. Accumulation requires that
+   * the two numbers have the same scale, but does not require that they are
+   * of the same size. If the value being accumulated has a larger underlying array
+   * than this value (the result), then the higher order bits are dropped, similar to
+   * what happens when adding a long to an int and storing the result in an int.
+   *
+   * @param <S> type of compressedbigdecimal to accumulate
+   * @param rhs The object to accumulate
+   * @return a reference to <b>this</b>
+   */
+  public <S extends CompressedBigDecimal<S>> CompressedBigDecimal<T> accumulate(CompressedBigDecimal<S> rhs)
+  {
+    if (rhs.scale != scale) {
+      throw new IllegalArgumentException("Cannot accumulate MutableBigDecimals with differing scales");
+    }
+    if (rhs.getArraySize() > getArraySize()) {
+      throw new IllegalArgumentException("Right hand side too big to fit in the result value");
+    }
+    internalAdd(getArraySize(), this, CompressedBigDecimal::getArrayEntry, CompressedBigDecimal::setArrayEntry,
+        rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
+    return this;
+  }
+
+  /**
+   * Clear any accumulated value, resetting to zero. Scale is preserved at its original value.
+   *
+   * @return this
+   */
+  public CompressedBigDecimal<T> reset()
+  {
+    for (int ii = 0; ii < getArraySize(); ++ii) {
+      setArrayEntry(ii, 0);
+    }
+    return this;
+  }
+
+  /**
+   * Accumulate values into this mutable value. Values to be accumulated
+   * must have the same scale as the result. The incoming value may have
+   * been more precision than the result. If so,  the upper bits are truncated,
+   * similar to what happens when a long is assigned to an int.
+   *
+   * @param <R>    type of the object containing the lhs array
+   * @param <S>    type of obejct containing the rhs array
+   * @param llen   the underlying left array size
+   * @param lhs    the object containing the left array data
+   * @param lhsGet method reference to get an underlying left value
+   * @param lhsSet method reference to set an underlying left value
+   * @param rlen   the underlying right array size
+   * @param rhs    the object containing the right array data
+   * @param rhsGet method reference to get an underlying right value
+   */
+  static <R, S> void internalAdd(int llen, R lhs, ToIntBiFunction<R, Integer> lhsGet, ObjBiIntConsumer<R> lhsSet,
+                                 int rlen, S rhs, ToIntBiFunction<S, Integer> rhsGet)
+  {
+    int commonLen = Integer.min(llen, rlen);
+    long carry = 0;
+    long sum;
+    // copy the common part
+    for (int ii = 0; ii < commonLen; ++ii) {
+      sum = (INT_MASK & lhsGet.applyAsInt(lhs, ii)) + (INT_MASK & rhsGet.applyAsInt(rhs, ii)) + carry;
+      lhsSet.accept(lhs, ii, (int) sum);
+      carry = sum >>> 32;
+    }
+
+    long signExtension = signumInternal(rlen, rhs, rhsGet) < 0 ? INT_MASK : 0;
+
+    // for the remaining portion of the lhs that didn't have matching
+    // rhs values, just propagate any necessary carry and sign extension
+    for (int ii = commonLen; ii < llen && (carry != 0 || signExtension != 0); ++ii) {
+      sum = (INT_MASK & lhsGet.applyAsInt(lhs, ii)) + signExtension + carry;
+      lhsSet.accept(lhs, ii, (int) sum);
+      carry = sum >>> 32;
+    }
+    // don't do anything with remaining rhs. That value is lost due to overflow.
+  }
+
+  /**
+   * Get a byte array representing the magnitude of this value,
+   * formatted for use by {@link BigInteger#BigInteger(byte[])}.
+   *
+   * @return the byte array for use in BigInteger
+   */
+  private byte[] toByteArray()
+  {
+    int byteArrayLength = getArraySize() * 4;
+    byte[] bytes = new byte[byteArrayLength];
+
+    int byteIdx = 0;
+    for (int ii = getArraySize(); ii > 0; --ii) {
+      int val = getArrayEntry(ii - 1);
+      bytes[byteIdx + 3] = (byte) val;
+      val >>>= 8;
+      bytes[byteIdx + 2] = (byte) val;
+      val >>>= 8;
+      bytes[byteIdx + 1] = (byte) val;
+      val >>>= 8;
+      bytes[byteIdx] = (byte) val;
+      byteIdx += 4;
+    }
+
+    int leadingZeros = Integer.numberOfLeadingZeros(getArrayEntry(getArraySize() - 1));
+    int emptyBytes = leadingZeros / 8;
+    if (emptyBytes != 0) {
+      if (emptyBytes == byteArrayLength || leadingZeros % 8 == 0) {
+        // don't get rid of all the leading zeros if it is the complete number (array size must
+        // be a minimum of 1) or if trimming the byte might change the sign of the value (first
+        // one is on a byte boundary).
+        emptyBytes--;
+      }
+      return Arrays.copyOfRange(bytes, emptyBytes, byteArrayLength);
+    }
+
+    return bytes;
+  }
+
+  /**
+   * Return the scale of the value.
+   *
+   * @return the scale
+   */
+  public int getScale()
+  {
+    return scale;
+  }
+
+  /**
+   * Return the array size.
+   *
+   * @return the array size
+   */
+  protected abstract int getArraySize();
+
+  /**
+   * Get value from the array.
+   *
+   * @param idx the index
+   * @return value from the array at that index
+   */
+  protected abstract int getArrayEntry(int idx);
+
+  /**
+   * Set value in the array.
+   *
+   * @param idx the index
+   * @param val the value
+   */
+  protected abstract void setArrayEntry(int idx, int val);
+
+  /**
+   * Create a {@link BigDecimal} with the equivalent value to this
+   * instance.
+   *
+   * @return the BigDecimal value
+   */
+  public BigDecimal toBigDecimal()
+  {
+    BigInteger bigInt = new BigInteger(toByteArray());
+    return new BigDecimal(bigInt, scale);
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString()
+  {
+    return toBigDecimal().toString();
+  }
+
+  /**
+   * Returns the signum function of this {@code BigDecimal}.
+   *
+   * @return -1, 0, or 1 as the value of this {@code BigDecimal} is negative, zero, or positive.
+   */
+  public int signum()
+  {
+    return signumInternal(getArraySize(), this, CompressedBigDecimal::getArrayEntry);
+  }
+
+  /**
+   * Internal implementation if signum.
+   * For the Provided Compressed big decimal value it checks and returns
+   * -1 if Negative
+   * 0 if Zero
+   * 1 if Positive
+   * @param <S>     type of object containing the array
+   * @param size    the underlying array size
+   * @param rhs     object that contains the underlying array
+   * @param valFunc method reference to get an underlying value
+   * @return -1, 0, or 1 as the value of this {@code BigDecimal} is negative, zero, or positive.
+   */
+  protected static <S> int signumInternal(int size, S rhs, ToIntBiFunction<S, Integer> valFunc)
+  {
+    if (valFunc.applyAsInt(rhs, size - 1) < 0) {
+      return -1;
+    } else {
+      int agg = 0;
+      for (int ii = 0; ii < size; ++ii) {
+        agg |= valFunc.applyAsInt(rhs, ii);
+      }
+      if (agg == 0) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Comparable#compareTo(java.lang.Object)
+   */
+  @Override
+  public int compareTo(CompressedBigDecimal<T> o)
+  {
+
+    if (this.equals(o)) {
+      return 0;
+    }
+    return this.toBigDecimal().compareTo(o.toBigDecimal());
+  }
+
+  /**
+   * Returns the value of the specified number as an {@code int},
+   * which may involve rounding or truncation.
+   *
+   * @return the numeric value represented by this object after conversion
+   * to type {@code int}.
+   */
+  @Override
+  public int intValue()
+  {
+    return toBigDecimal().setScale(0, BigDecimal.ROUND_HALF_UP).intValue();
+  }
+
+  /**
+   * Returns the value of the specified number as a {@code long},
+   * which may involve rounding or truncation.
+   *
+   * @return the numeric value represented by this object after conversion
+   * to type {@code long}.
+   */
+  @Override
+  public long longValue()
+  {
+    return toBigDecimal().setScale(0, BigDecimal.ROUND_HALF_UP).longValue();
+  }
+
+  /**
+   * Returns the value of the specified number as a {@code float},
+   * which may involve rounding.
+   *
+   * @return the numeric value represented by this object after conversion
+   * to type {@code float}.
+   */
+  @Override
+  public float floatValue()
+  {
+    return toBigDecimal().floatValue();
+  }
+
+  /**
+   * Returns the value of the specified number as a {@code double},
+   * which may involve rounding.
+   *
+   * @return the numeric value represented by this object after conversion
+   * to type {@code double}.
+   */
+  @Override
+  public double doubleValue()
+  {
+    return toBigDecimal().doubleValue();
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java
new file mode 100644
index 0000000000..05d0451c0d
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+
+import org.apache.druid.query.aggregation.AggregateCombiner;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+/**
+ * AggregateCombiner for CompressedBigDecimals.
+ */
+public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal<?>>
+{
+  private CompressedBigDecimal<?> sum;
+
+  @Override
+  public void reset(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
+  {
+    @SuppressWarnings("unchecked")
+    ColumnValueSelector<CompressedBigDecimal<?>> selector =
+        (ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
+
+    CompressedBigDecimal<?> cbd = selector.getObject();
+    if (sum == null) {
+      sum = new ArrayCompressedBigDecimal(cbd);
+    } else {
+      sum.reset();
+      sum.accumulate(cbd);
+    }
+  }
+
+  @Override
+  public void fold(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
+  {
+    @SuppressWarnings("unchecked")
+    ColumnValueSelector<CompressedBigDecimal<?>> selector =
+        (ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
+
+    CompressedBigDecimal<?> cbd = selector.getObject();
+
+    if (sum == null) {
+      sum = new ArrayCompressedBigDecimal(cbd);
+    } else {
+      if (cbd.signum() != 0) {
+        sum.accumulate(cbd);
+      }
+    }
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getDouble()");
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getLong()");
+  }
+
+  @Nullable
+  @Override
+  public CompressedBigDecimal<?> getObject()
+  {
+    return sum;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class<CompressedBigDecimal<?>> classOfObject()
+  {
+    return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java
new file mode 100644
index 0000000000..591131492f
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.segment.ColumnValueSelector;
+
+/**
+ * An Aggregator to aggregate big decimal values.
+ */
+public class CompressedBigDecimalAggregator implements Aggregator
+{
+
+  private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
+  private final CompressedBigDecimal<?> sum;
+
+  /**
+   * Constructor.
+   *
+   * @param size     the size to allocate
+   * @param scale    the scale
+   * @param selector that has the metric value
+   */
+  public CompressedBigDecimalAggregator(
+      int size,
+      int scale,
+      ColumnValueSelector<CompressedBigDecimal<?>> selector
+  )
+  {
+    this.selector = selector;
+    this.sum = ArrayCompressedBigDecimal.allocate(size, scale);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.Aggregator#aggregate()
+   */
+  @Override
+  public void aggregate()
+  {
+    CompressedBigDecimal<?> selectedObject = selector.getObject();
+    if (selectedObject != null) {
+      if (selectedObject.getScale() != sum.getScale()) {
+        selectedObject = Utils.scaleUp(selectedObject);
+      }
+      sum.accumulate(selectedObject);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.Aggregator#get()
+   */
+  @Override
+  public Object get()
+  {
+    return sum;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.Aggregator#getFloat()
+   */
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalAggregator does not support getFloat()");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.Aggregator#getLong()
+   */
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalAggregator does not support getLong()");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.Aggregator#close()
+   */
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java
new file mode 100644
index 0000000000..a75f31e1b2
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregateCombiner;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * An aggregator factory to generate longSum aggregator object.
+ */
+public class CompressedBigDecimalAggregatorFactory
+    extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal<?>>>
+{
+
+  public static final int DEFAULT_SCALE = 9;
+  public static final int DEFAULT_SIZE = 3;
+  private static final byte CACHE_TYPE_ID = 0x37;
+
+  public static final Comparator<CompressedBigDecimal<?>> COMPARATOR = new Comparator<CompressedBigDecimal<?>>()
+  {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override
+    public int compare(CompressedBigDecimal lhs, CompressedBigDecimal rhs)
+    {
+      return lhs.compareTo(rhs);
+    }
+  };
+
+  private final String name;
+  private final String fieldName;
+  private final int size;
+  private final int scale;
+
+  /**
+   * Constructor.
+   *
+   * @param name      metric field name
+   * @param fieldName fieldName metric field name
+   * @param size      size of the int array used for calculations
+   * @param scale     scale of the number
+   */
+  @JsonCreator
+  public CompressedBigDecimalAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") String fieldName,
+      @JsonProperty(value = "size", required = false) Integer size,
+      @JsonProperty(value = "scale", required = false) Integer scale
+  )
+  {
+    this.name = name;
+    this.fieldName = fieldName;
+    this.size = size == null ? DEFAULT_SIZE : size;
+    this.scale = scale == null ? DEFAULT_SCALE : scale;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected ColumnValueSelector<CompressedBigDecimal<?>> selector(ColumnSelectorFactory metricFactory)
+  {
+    return (ColumnValueSelector<CompressedBigDecimal<?>>) metricFactory.makeColumnValueSelector(fieldName);
+  }
+
+  @Override
+  protected Aggregator factorize(ColumnSelectorFactory metricFactory,
+                                 @Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
+  {
+    return new CompressedBigDecimalAggregator(size, scale, selector);
+  }
+
+  @Override
+  protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory,
+                                               @Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
+  {
+    return new CompressedBigDecimalBufferAggregator(size, scale, selector);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getComparator()
+   */
+  @Override
+  public Comparator<CompressedBigDecimal<?>> getComparator()
+  {
+    return COMPARATOR;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#combine(java.lang.Object, java.lang.Object)
+   */
+  @Nullable
+  @Override
+  public Object combine(Object lhs, Object rhs)
+  {
+    if (lhs == null && rhs == null) {
+      return ArrayCompressedBigDecimal.allocate(size, scale);
+    } else if (lhs == null) {
+      return rhs;
+    } else if (rhs == null) {
+      return lhs;
+    } else {
+      // Allocate a new result and accumlate both left and right into it.
+      // This ensures that the result has the correct scale, avoiding possible IllegalArgumentExceptions
+      // due to truncation when the deserialized objects aren't big enough to hold the accumlated result.
+      // The most common case this avoids is deserializing 0E-9 into a CompressedBigDecimal with array
+      // size 1 and then accumulating a larger value into it.
+      CompressedBigDecimal<?> retVal = ArrayCompressedBigDecimal.allocate(size, scale);
+      CompressedBigDecimal<?> left = (CompressedBigDecimal<?>) lhs;
+      CompressedBigDecimal<?> right = (CompressedBigDecimal<?>) rhs;
+      if (left.signum() != 0) {
+        retVal.accumulate(left);
+      }
+      if (right.signum() != 0) {
+        retVal.accumulate(right);
+      }
+      return retVal;
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getCombiningFactory()
+   */
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new CompressedBigDecimalAggregatorFactory(name, name, size, scale);
+  }
+
+  @Override
+  public AggregateCombiner<CompressedBigDecimal<?>> makeAggregateCombiner()
+  {
+    return new CompressedBigDecimalAggregateCombiner();
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getRequiredColumns()
+   */
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(new CompressedBigDecimalAggregatorFactory(
+        fieldName,
+        fieldName,
+        size,
+        scale
+    ));
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#deserialize(java.lang.Object)
+   */
+  @Nullable
+  @Override
+  public Object deserialize(Object object)
+  {
+    if (object == null) {
+      return null;
+    } else if (object instanceof BigDecimal) {
+      return new ArrayCompressedBigDecimal((BigDecimal) object);
+    } else if (object instanceof Double) {
+      return new ArrayCompressedBigDecimal(new BigDecimal((Double) object));
+    } else if (object instanceof String) {
+      return new ArrayCompressedBigDecimal(new BigDecimal((String) object));
+    } else {
+      throw new RuntimeException("unknown type in deserialize: " + object.getClass().getSimpleName());
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#requiredFields()
+   */
+  @Override
+  public List<String> requiredFields()
+  {
+    return Collections.singletonList(fieldName);
+  }
+
+  /* (non-Javadoc) Get Type */
+  @Override
+  public ValueType getType()
+  {
+    return ValueType.COMPLEX;
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getTypeName()
+   */
+  @Override
+  public String getComplexTypeName()
+  {
+    return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
+  }
+ 
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getCacheKey()
+   */
+  @Override
+  public byte[] getCacheKey()
+  {
+    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
+    return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#finalizeComputation(java.lang.Object)
+   */
+  @Override
+  public Object finalizeComputation(Object object)
+  {
+    CompressedBigDecimal<?> compressedBigDecimal = (CompressedBigDecimal<?>) object;
+    BigDecimal bigDecimal = compressedBigDecimal.toBigDecimal();
+    return bigDecimal.compareTo(BigDecimal.ZERO) == 0 ? 0 : bigDecimal;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getName()
+   */
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  /**
+   * Get the filed name.
+   *
+   * @return dimension/metric field name
+   */
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @JsonProperty
+  public int getScale()
+  {
+    return scale;
+  }
+
+  @JsonProperty
+  public int getSize()
+  {
+    return size;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.AggregatorFactory#getMaxIntermediateSize()
+   */
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return Integer.BYTES * size;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CompressedBigDecimalAggregatorFactory{" +
+        "name='" + getName() + '\'' +
+        ", type='" + getComplexTypeName() + '\'' +
+        ", fieldName='" + getFieldName() + '\'' +
+        ", requiredFields='" + requiredFields() + '\'' +
+        ", size='" + getSize() + '\'' +
+        ", scale='" + getScale() + '\'' +
+        '}';
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java
new file mode 100644
index 0000000000..f5f2de44a3
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A buffered aggregator to aggregate big decimal value.
+ */
+public class CompressedBigDecimalBufferAggregator implements BufferAggregator
+{
+
+  //Cache will hold the aggregated value.
+  //We are using ByteBuffer to hold the key to the aggregated value.
+  private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
+  private final int size;
+  private final int scale;
+
+  /**
+   * Constructor.
+   *
+   * @param size     the size to allocate
+   * @param scale    the scale
+   * @param selector a ColumnSelector to retrieve incoming values
+   */
+  public CompressedBigDecimalBufferAggregator(
+      int size,
+      int scale,
+      ColumnValueSelector<CompressedBigDecimal<?>> selector
+  )
+  {
+    this.selector = selector;
+    this.size = size;
+    this.scale = scale;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#init(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    for (int ii = 0; ii < size; ++ii) {
+      buf.putInt(position + (ii * Integer.BYTES), 0);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#aggregate(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    CompressedBigDecimal<?> addend = selector.getObject();
+    if (addend != null) {
+      Utils.accumulate(buf, position, size, scale, addend);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#get(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    return new ByteBufferCompressedBigDecimal(buf, position, size, scale);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#getFloat(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalBufferAggregator does not support getFloat()");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#getLong(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("CompressedBigDecimalBufferAggregator does not support getLong()");
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.query.aggregation.BufferAggregator#close()
+   */
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
new file mode 100644
index 0000000000..52833e0556
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ObjectColumnSelector;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.data.ColumnarInts;
+import org.apache.druid.segment.data.ColumnarMultiInts;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.ReadableOffset;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Accumulating Big Decimal column in druid segment.
+ */
+public class CompressedBigDecimalColumn implements ComplexColumn
+{
+  public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class);
+
+  private final ColumnarInts scale;
+  private final ColumnarMultiInts magnitude;
+
+  /**
+   * Constructor.
+   *
+   * @param scale     scale of the rows
+   * @param magnitude LongColumn representing magnitudes
+   */
+  public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude)
+  {
+    this.scale = scale;
+    this.magnitude = magnitude;
+  }
+
+  @Override
+  public Class<CompressedBigDecimalColumn> getClazz()
+  {
+    return CompressedBigDecimalColumn.class;
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
+  }
+
+  @Override
+  public CompressedBigDecimal<?> getRowValue(int rowNum)
+  {
+    int s = scale.get(rowNum);
+
+    // make a copy of the value from magnitude because the IndexedInts returned
+    // from druid is mutable and druid reuses the object for future calls.
+    IndexedInts vals = magnitude.get(rowNum);
+    int size = vals.size();
+    int[] array = new int[size];
+    for (int ii = 0; ii < size; ++ii) {
+      array[ii] = vals.get(ii);
+    }
+
+    return ArrayCompressedBigDecimal.wrap(array, s);
+  }
+
+  @Override
+  public int getLength()
+  {
+    return scale.size();
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(final ReadableOffset offset)
+  {
+    return new ObjectColumnSelector<CompressedBigDecimal<?>>()
+    {
+      @Override @Nullable
+      public CompressedBigDecimal<?> getObject()
+      {
+        return getRowValue(offset.getOffset());
+      }
+
+      @Override @SuppressWarnings("unchecked")
+      public Class<CompressedBigDecimal<?>> classOfObject()
+      {
+        return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("column", CompressedBigDecimalColumn.this);
+      }
+    };
+  }
+
+  @Override
+  public void close()
+  {
+    try {
+      scale.close();
+    }
+    catch (IOException ex) {
+      LOGGER.error(ex, "failed to clean up scale part of CompressedBigDecimalColumn");
+    }
+    try {
+      magnitude.close();
+    }
+    catch (IOException ex) {
+      LOGGER.error(ex, "failed to clean up magnitude part of CompressedBigDecimalColumn");
+    }
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
new file mode 100644
index 0000000000..0b58827e8a
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+
+import com.google.common.base.Supplier;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
+import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier;
+import java.nio.ByteBuffer;
+
+/**
+ * Complex column supplier that understands {@link CompressedBigDecimal} values.
+ */
+public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexColumn>
+{
+
+  public static final int VERSION = 0x1;
+
+  private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
+  private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
+
+  /**
+   * Constructor.
+   *
+   * @param scaleSupplier     scale supplier
+   * @param magnitudeSupplier supplied of results
+   */
+  public CompressedBigDecimalColumnPartSupplier(
+      CompressedVSizeColumnarIntsSupplier scaleSupplier,
+      V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
+  )
+  {
+    this.scaleSupplier = scaleSupplier;
+    this.magnitudeSupplier = magnitudeSupplier;
+  }
+
+  /**
+   * Compressed.
+   *
+   * @param buffer Byte buffer
+   * @return new instance of CompressedBigDecimalColumnPartSupplier
+   */
+  public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(
+      ByteBuffer buffer
+  )
+  {
+    byte versionFromBuffer = buffer.get();
+
+    if (versionFromBuffer == VERSION) {
+
+      CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+          buffer,
+          IndexIO.BYTE_ORDER);
+
+      V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
+          V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);
+
+      return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier);
+    } else {
+      throw new IAE("Unknown version[%s]", versionFromBuffer);
+    }
+  }
+
+  @Override
+  public ComplexColumn get()
+  {
+    return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get());
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java
new file mode 100644
index 0000000000..49b3ed23da
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+
+/**
+ * CompressedBigDecimal json serializer.
+ */
+@SuppressWarnings("rawtypes")
+public class CompressedBigDecimalJsonSerializer extends JsonSerializer<CompressedBigDecimal>
+{
+
+  /* (non-Javadoc)
+   * @see JsonSerializer#serialize(java.lang.Object, com.fasterxml.jackson.core.JsonGenerator,
+   * com.fasterxml.jackson.databind.SerializerProvider)
+   */
+  @Override
+  public void serialize(CompressedBigDecimal value, JsonGenerator jgen, SerializerProvider provider)
+      throws IOException
+  {
+    jgen.writeString(value.toString());
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
new file mode 100644
index 0000000000..e31f845e0f
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.data.ArrayBasedIndexedInts;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Locale;
+
+/**
+ * Column Serializer that understands converting CompressedBigDecimal to 4 byte long values for better storage.
+ */
+public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal<?>>
+{
+  private static final byte VERSION = CompressedBigDecimalColumnPartSupplier.VERSION;
+
+  /**
+   * Static constructor.
+   *
+   * @param segmentWriteOutMedium the peon
+   * @param filenameBase          filename of the index
+   * @return a constructed AccumulatingBigDecimalLongColumnSerializer
+   */
+  public static CompressedBigDecimalLongColumnSerializer create(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      String filenameBase)
+  {
+    return new CompressedBigDecimalLongColumnSerializer(
+        CompressedVSizeColumnarIntsSerializer.create(
+            "dummy",
+            segmentWriteOutMedium,
+            String.format(Locale.ROOT, "%s.scale", filenameBase),
+            16,
+            CompressionStrategy.LZ4),
+        V3CompressedVSizeColumnarMultiIntsSerializer.create(
+            "dummy",
+            segmentWriteOutMedium,
+            String.format(Locale.ROOT, "%s.magnitude", filenameBase),
+            Integer.MAX_VALUE,
+            CompressionStrategy.LZ4));
+  }
+
+  private final CompressedVSizeColumnarIntsSerializer scaleWriter;
+  private final V3CompressedVSizeColumnarMultiIntsSerializer magnitudeWriter;
+
+  /**
+   * Constructor.
+   *
+   * @param scaleWriter     the scale writer
+   * @param magnitudeWriter the magnitude writer
+   */
+  public CompressedBigDecimalLongColumnSerializer(
+      CompressedVSizeColumnarIntsSerializer scaleWriter,
+      V3CompressedVSizeColumnarMultiIntsSerializer magnitudeWriter
+  )
+  {
+    this.scaleWriter = scaleWriter;
+    this.magnitudeWriter = magnitudeWriter;
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    scaleWriter.open();
+    magnitudeWriter.open();
+  }
+
+  @Override
+  public void serialize(ColumnValueSelector<? extends CompressedBigDecimal<?>> obj) throws IOException
+  {
+    CompressedBigDecimal<?> abd = obj.getObject();
+    int[] array = new int[abd.getArraySize()];
+    for (int ii = 0; ii < abd.getArraySize(); ++ii) {
+      array[ii] = abd.getArrayEntry(ii);
+    }
+
+    scaleWriter.addValue(abd.getScale());
+    magnitudeWriter.addValues(new ArrayBasedIndexedInts(array));
+  }
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    return 1 +   // version
+        scaleWriter.getSerializedSize() +
+        magnitudeWriter.getSerializedSize();
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    channel.write(ByteBuffer.wrap(new byte[] {VERSION}));
+    scaleWriter.writeTo(channel, smoosher);
+    magnitudeWriter.writeTo(channel, smoosher);
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java
new file mode 100644
index 0000000000..d1193111c9
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.data.ObjectStrategy;
+import org.apache.druid.segment.serde.ComplexMetricExtractor;
+import org.apache.druid.segment.serde.ComplexMetricSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+/**
+ * ComplexMetricSerde that understands how to read and write scaled longs.
+ */
+public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
+{
+
+  private CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
+
+  /* (non-Javadoc)
+   * @see ComplexMetricSerde#getTypeName()
+   */
+  @Override
+  public String getTypeName()
+  {
+    return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
+  }
+
+  @Override
+  public ComplexMetricExtractor<CompressedBigDecimal<?>> getExtractor()
+  {
+    return new ComplexMetricExtractor<CompressedBigDecimal<?>>()
+    {
+      @SuppressWarnings("unchecked")
+      @Override
+      public Class<CompressedBigDecimal<?>> extractedClass()
+      {
+        return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+      }
+
+      @Override
+      public CompressedBigDecimal<?> extractValue(InputRow inputRow, String metricName)
+      {
+        Object rawMetric = inputRow.getRaw(metricName);
+        if (rawMetric == null) {
+          return null;
+        } else if (rawMetric instanceof BigDecimal) {
+          return new ArrayCompressedBigDecimal((BigDecimal) rawMetric);
+        } else if (rawMetric instanceof String) {
+          return new ArrayCompressedBigDecimal(new BigDecimal((String) rawMetric));
+        } else if (rawMetric instanceof CompressedBigDecimal<?>) {
+          return (CompressedBigDecimal<?>) rawMetric;
+        } else {
+          throw new ISE("Unknown extraction value type: [%s]", rawMetric.getClass().getSimpleName());
+        }
+      }
+    };
+  }
+
+  /* (non-Javadoc)
+   * @see ComplexMetricSerde#deserializeColumn(java.nio.ByteBuffer, org.apache.druid.segment.column.ColumnBuilder)
+   */
+  @Override
+  public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
+  {
+    builder.setComplexColumnSupplier(
+        CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer));
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.druid.segment.serde.ComplexMetricSerde#getSerializer(org.apache.druid.segment.data.IOPeon,
+   * java.lang.String)
+   */
+  @Override
+  public CompressedBigDecimalLongColumnSerializer getSerializer(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      String column)
+  {
+    return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);
+  }
+
+  /* (non-Javadoc)
+   * @see ComplexMetricSerde#getObjectStrategy()
+   */
+  @Override
+  public ObjectStrategy<CompressedBigDecimal<?>> getObjectStrategy()
+  {
+    return strategy;
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalModule.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalModule.java
new file mode 100644
index 0000000000..cde4f3fc0b
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalModule.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import java.util.List;
+
+/**
+ * Druid module for BigDecimal complex metrics and aggregator.
+ */
+public class CompressedBigDecimalModule implements DruidModule
+{
+  public static final String COMPRESSED_BIG_DECIMAL = "compressedBigDecimal";
+
+  @Override
+  public void configure(Binder binder)
+  {
+    if (ComplexMetrics.getSerdeForType(COMPRESSED_BIG_DECIMAL) == null) {
+      ComplexMetrics.registerSerde(COMPRESSED_BIG_DECIMAL, new CompressedBigDecimalMetricSerde());
+    }
+  }
+
+  @Override
+  public List<Module> getJacksonModules()
+  {
+    return ImmutableList.of(
+        new SimpleModule("CompressedBigDecimalModule")
+            .registerSubtypes(
+                new NamedType(CompressedBigDecimalAggregatorFactory.class, COMPRESSED_BIG_DECIMAL)
+            )
+            .addSerializer(
+                CompressedBigDecimal.class,
+                new CompressedBigDecimalJsonSerializer()
+            )
+    );
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java
new file mode 100644
index 0000000000..8ef284b82b
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.segment.data.ObjectStrategy;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+/**
+ * Defines strategy on how to read and write data from deep storage.
+ */
+public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal<?>>
+{
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public int compare(CompressedBigDecimal o1, CompressedBigDecimal o2)
+  {
+    return o1.compareTo(o2);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.druid.segment.data.ObjectStrategy#getClazz()
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Class getClazz()
+  {
+    return CompressedBigDecimal.class;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.druid.segment.data.ObjectStrategy#fromByteBuffer(java.nio.ByteBuffer, int)
+   */
+  @Override
+  public CompressedBigDecimal<?> fromByteBuffer(ByteBuffer buffer, int numBytes)
+  {
+    ByteBuffer myBuf = buffer.slice();
+    myBuf.order(ByteOrder.LITTLE_ENDIAN);
+    IntBuffer intBuf = myBuf.asIntBuffer();
+    int scale = intBuf.get();
+    int[] array = new int[numBytes / 4 - 1];
+    intBuf.get(array);
+
+    return ArrayCompressedBigDecimal.wrap(array, scale);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.druid.segment.data.ObjectStrategy#toBytes(java.lang.Object)
+   */
+  @Override
+  public byte[] toBytes(CompressedBigDecimal<?> val)
+  {
+    ByteBuffer buf = ByteBuffer.allocate(4 * (val.getArraySize() + 1));
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+    IntBuffer intBuf = buf.asIntBuffer();
+    intBuf.put(val.getScale());
+    for (int ii = 0; ii < val.getArraySize(); ++ii) {
+      intBuf.put(val.getArrayEntry(ii));
+    }
+
+    return buf.array();
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ObjBiIntConsumer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ObjBiIntConsumer.java
new file mode 100644
index 0000000000..8122a61e94
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ObjBiIntConsumer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+/**
+ * Represents an operation that accepts an object-valued and two int-valued arguments, and returns no result.
+ * This is the (reference, int, int) specialization of BiConsumer. Unlike most other functional interfaces,
+ * ObjBiIntConsumer is expected to operate via side-effects
+ *
+ * @param <T> Type of Object for first argument
+ */
+@FunctionalInterface
+public interface ObjBiIntConsumer<T>
+{
+  /**
+   * Performs this operation on the given arguments.
+   *
+   * @param t    object arg
+   * @param val1 first int argument
+   * @param val2 second int argument
+   */
+  void accept(T t, int val1, int val2);
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java
new file mode 100644
index 0000000000..2e86504c56
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.segment.data.IndexedInts;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.function.ToIntBiFunction;
+
+/**
+ * Utility opertaions for accumlation.
+ */
+public class Utils
+{
+  /**
+   * Accumulate (add) the passed in value into the current total. This
+   * modifies the value of the current object. The scale of the BigDecimal is adjusted to match
+   * the current accumulating scale. If the value being accumulated has a larger underlying array
+   * than this value (the result), then the higher order bits are dropped, similar to
+   * what happens when adding a long to an int and storing the result in an int.
+   *
+   * @param <S> Type of CompressedBigDecimal into which to accumulate
+   * @param lhs The object into which to accumulate
+   * @param rhs The object to accumulate
+   * @return a reference to <b>this</b>
+   */
+  public static <S extends CompressedBigDecimal<S>>
+      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, BigDecimal rhs)
+  {
+    CompressedBigDecimal<ArrayCompressedBigDecimal> abd =
+        new ArrayCompressedBigDecimal(rhs.setScale(lhs.getScale()));
+    return lhs.accumulate(abd);
+  }
+
+  /**
+   * Accumulate (add) the passed in value into the current total. This
+   * modifies the value of the current object. Accumulation requires that
+   * the two numbers have the same scale, but does not require that they are
+   * of the same size. If the value being accumulated has a larger underlying array
+   * than this value (the result), then the higher order bits are dropped, similar to
+   * what happens when adding a long to an int and storing the result in an int.
+   *
+   * @param <S>      Type of CompressedBigDecimal into which to accumulate
+   * @param lhs      The object into which to accumulate
+   * @param rhs      The object to accumulate
+   * @param rhsScale The scale to apply to the long being accumulated
+   * @return a reference to <b>this</b>
+   */
+  public static <S extends CompressedBigDecimal<S>>
+      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, long rhs, int rhsScale)
+  {
+    CompressedBigDecimal<ArrayCompressedBigDecimal> abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
+    return lhs.accumulate(abd);
+  }
+
+  /**
+   * Accumulate using IndexedInts read from Druid's segment file.
+   *
+   * @param <S>      Type of CompressedBigDecimal into which to accumulate
+   * @param lhs      The object into which to accumulate
+   * @param rhs      IndexedInts representing array of magnitude values
+   * @param rhsScale the scale
+   * @return a reference to <b>this</b>
+   */
+  public static <S extends CompressedBigDecimal<S>>
+      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, IndexedInts rhs, int rhsScale)
+  {
+    if (rhs.size() > lhs.getArraySize()) {
+      throw new IllegalArgumentException("Right hand side too big to fit in the result value");
+    }
+    CompressedBigDecimal.internalAdd(lhs.getArraySize(), lhs, CompressedBigDecimal::getArrayEntry,
+        CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get);
+    return lhs;
+  }
+
+  /**
+   * Accumulate using ByteBuffers for Druid BufferAggregator.
+   *
+   * @param buf      The byte buffer that containes the result to accumlate into
+   * @param pos      The initial position within the buffer
+   * @param lhsSize  The array size of the left
+   * @param lhsScale The scale of the left
+   * @param rhs      the right side to accumlate
+   */
+  public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal<?> rhs)
+  {
+    if (rhs.getArraySize() > lhsSize) {
+      throw new IllegalArgumentException("Right hand side too big to fit in the result value");
+    }
+    BufferAccessor accessor = BufferAccessor.prepare(pos);
+    CompressedBigDecimal.internalAdd(lhsSize, buf, accessor, accessor,
+        rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
+  }
+
+  /**
+   * Returns a {@code CompressedBigDecimal} whose scale is moderated as per the default scale.
+   *
+   * @param <S> Type of CompressedBigDecimal to scale
+   * @param val The value to scale up
+   * @return Scaled up compressedBigDecimal
+   */
+  public static <S extends CompressedBigDecimal<S>>
+      CompressedBigDecimal<ArrayCompressedBigDecimal> scaleUp(CompressedBigDecimal<S> val)
+  {
+    return new ArrayCompressedBigDecimal(
+        val.toBigDecimal().setScale(CompressedBigDecimalAggregatorFactory.DEFAULT_SCALE, BigDecimal.ROUND_UP)
+    );
+  }
+
+  /**
+   * Helper class that maintains a cache of thread local objects that can be used to access
+   * a ByteBuffer in {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}.
+   */
+  private static class BufferAccessor implements ToIntBiFunction<ByteBuffer, Integer>, ObjBiIntConsumer<ByteBuffer>
+  {
+    private static ThreadLocal<BufferAccessor> cache = new ThreadLocal<BufferAccessor>()
+    {
+      @Override
+      protected BufferAccessor initialValue()
+      {
+        return new BufferAccessor();
+      }
+    };
+
+    private int position = 0;
+
+    /**
+     * Initialized the BufferAccessor with the location that should be used for access.
+     *
+     * @param position position within the buffer
+     * @return An initialized BufferAccessor
+     */
+    public static BufferAccessor prepare(int position)
+    {
+      BufferAccessor accessor = cache.get();
+      accessor.position = position;
+      return accessor;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.druid.compressedbigdecimal.ObjBiIntConsumer#accept(java.lang.Object, int, int)
+     */
+    @Override
+    public void accept(ByteBuffer buf, int idx, int val)
+    {
+      buf.putInt(position + idx * Integer.BYTES, val);
+    }
+
+    /* (non-Javadoc)
+     * @see java.util.function.ToIntBiFunction#applyAsInt(java.lang.Object, java.lang.Object)
+     */
+    @Override
+    public int applyAsInt(ByteBuffer buf, Integer idx)
+    {
+      return buf.getInt(position + idx * Integer.BYTES);
+    }
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/compressed-bigdecimal/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 0000000000..0d67028028
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.compressedbigdecimal.CompressedBigDecimalModule
\ No newline at end of file
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java
new file mode 100644
index 0000000000..407e59dfaf
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.data.ColumnarInts;
+import org.apache.druid.segment.data.ColumnarMultiInts;
+import org.apache.druid.segment.data.ReadableOffset;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+public class AggregatorCombinerFactoryTest
+{
+  /**
+   * Test method for {@link CompressedBigDecimalColumn}.
+   */
+  @Test
+  public void testCompressedBigDecimalColumn()
+  {
+    ColumnarMultiInts cmi = EasyMock.createMock(ColumnarMultiInts.class);
+    ColumnarInts ci = EasyMock.createMock(ColumnarInts.class);
+    ReadableOffset ro = EasyMock.createMock(ReadableOffset.class);
+    CompressedBigDecimalColumn cbr = new CompressedBigDecimalColumn(ci, cmi);
+    assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
+    assertEquals(0, cbr.getLength());
+    assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
+    assertNotNull(cbr.makeColumnValueSelector(ro));
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregatorFactory}.
+   */
+  @Test
+  public void testCompressedBigDecimalAggregatorFactory()
+  {
+    CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
+    assertEquals("CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}", cf.toString());
+    assertNotNull(cf.getCacheKey());
+    assertNull(cf.deserialize(null));
+    assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
+    assertEquals("5", cf.deserialize(5d).toString());
+    assertEquals("5", cf.deserialize("5").toString());
+    assertEquals("[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]", Arrays.toString(cf.getRequiredColumns().toArray()));
+    assertEquals("0", cf.combine(null, null).toString());
+    assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
+    assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
+    assertEquals("8", cf.combine(new ArrayCompressedBigDecimal(new BigDecimal(4)), new ArrayCompressedBigDecimal(new BigDecimal(4))).toString());
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregatorFactory#deserialize(Object)}.
+   */
+  @Test (expected = RuntimeException.class)
+  public void testCompressedBigDecimalAggregatorFactoryDeserialize()
+  {
+    CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
+    cf.deserialize(5);
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalBufferAggregator#getFloat(ByteBuffer, int)}
+   */
+  @Test (expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalBufferAggregatorGetFloat()
+  {
+    ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
+    ByteBuffer bbuf = ByteBuffer.allocate(10);
+    CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
+    ca.getFloat(bbuf, 0);
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalBufferAggregator#getLong(ByteBuffer, int)}
+   */
+  @Test (expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalBufferAggregatorGetLong()
+  {
+    ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
+    ByteBuffer bbuf = ByteBuffer.allocate(10);
+    CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
+    ca.getLong(bbuf, 0);
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregateCombiner#getObject()}
+   */
+  @Test
+  public void testCompressedBigDecimalAggregateCombinerGetObject()
+  {
+    CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
+    CompressedBigDecimal c = cc.getObject();
+    assertSame(null, c);
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregateCombiner#getClass()}
+   */
+  @Test
+  public void testCompressedBigDecimalAggregateCombinerClassofObject()
+  {
+    CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
+    assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregateCombiner#getLong()}
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalAggregateCombinerGetLong()
+  {
+    CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
+    cc.getLong();
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregateCombiner#getFloat()}
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalAggregateCombinerGetFloat()
+  {
+    CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
+    cc.getFloat();
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregateCombiner#getDouble()}
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalAggregateCombinerGetDouble()
+  {
+    CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
+    cc.getDouble();
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregator#getFloat()}
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalAggregatorGetFloat()
+  {
+    ColumnValueSelector cv = EasyMock.createMock(ColumnValueSelector.class);
+    CompressedBigDecimalAggregator cc = new CompressedBigDecimalAggregator(2, 0, cv);
+    cc.getFloat();
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalAggregator#getLong()}
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCompressedBigDecimalAggregatorGetLong()
+  {
+    ColumnValueSelector cv = EasyMock.createMock(ColumnValueSelector.class);
+    CompressedBigDecimalAggregator cc = new CompressedBigDecimalAggregator(2, 0, cv);
+    cc.getLong();
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimalTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimalTest.java
new file mode 100644
index 0000000000..abed5c1027
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimalTest.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import static org.apache.druid.compressedbigdecimal.Utils.accumulate;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for CompressedBigDecimal.
+ */
+public class ArrayCompressedBigDecimalTest
+{
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal(long, int)}.
+   */
+  @Test
+  public void testLongConstructorZero()
+  {
+    // Validate simple 0 case with longs.
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(0, 0);
+    d.reset();
+    assertEquals(0, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(0, array[0]);
+    assertEquals(0, array[1]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testLongConstructorPositive()
+  {
+    // Validate positive number that doesn't flow into the next int
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(Integer.MAX_VALUE, 9);
+    ArrayCompressedBigDecimal dl = d;
+    assertEquals(9, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(Integer.MAX_VALUE, array[0]);
+    assertEquals(0, array[1]);
+    assertEquals(0, d.compareTo(new ArrayCompressedBigDecimal(Integer.MAX_VALUE, 9)));
+    assertEquals(0, d.compareTo(dl));
+
+
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testLongConstructorNegative()
+  {
+    // validate negative number correctly fills in upper bits.
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(Integer.MIN_VALUE, 5);
+    assertEquals(5, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(Integer.MIN_VALUE, array[0]);
+    assertEquals(-1, array[1]);
+    assertEquals(-21475, d.intValue());
+    assertEquals(-21475, d.longValue());
+    assertEquals(-21475, d.shortValue());
+
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorZero()
+  {
+    // simple zero case to test short circuiting
+    BigDecimal bd = new BigDecimal(0);
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(0, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(1, array.length);
+    assertEquals(0, array[0]);
+    assertEquals("0", d.toString());
+    assertEquals(0, d.signum());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorSmallPositive()
+  {
+    // simple one int positive example
+    BigDecimal bd = new BigDecimal(Integer.MAX_VALUE).scaleByPowerOfTen(-9);
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(9, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(1, array.length);
+    assertEquals(Integer.MAX_VALUE, array[0]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorSmallNegative()
+  {
+    // simple one int negative example
+    BigDecimal bd = new BigDecimal(Integer.MIN_VALUE).scaleByPowerOfTen(-5);
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(5, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(1, array.length);
+    assertEquals(Integer.MIN_VALUE, array[0]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorLargePositive()
+  {
+    // simple two int positive example
+    BigDecimal bd = new BigDecimal(Long.MAX_VALUE).scaleByPowerOfTen(-9);
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(9, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(-1, array[0]);
+    assertEquals(Integer.MAX_VALUE, array[1]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorLargeNegative()
+  {
+    // simple two int negative example
+    BigDecimal bd = new BigDecimal(Long.MIN_VALUE).scaleByPowerOfTen(-5);
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(5, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(0, array[0]);
+    assertEquals(Integer.MIN_VALUE, array[1]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorUnevenMultiplePositive()
+  {
+    // test positive when number of bytes in BigDecimal isn't an even multiple of sizeof(int)
+    BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1, -1, -1}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(0, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(-1, array[0]);
+    assertEquals(0x7f, array[1]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
+   */
+  @Test
+  public void testBigDecimalConstructorUnevenMultipleNegative()
+  {
+    // test negative when number of bytes in BigDecimal isn't an even multiple of sizeof(int)
+    BigDecimal bd = new BigDecimal(new BigInteger(-1, new byte[] {Byte.MIN_VALUE, 0, 0, 0, 0}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    assertEquals(0, d.getScale());
+    int[] array = d.getArray();
+    assertEquals(2, array.length);
+    assertEquals(0, array[0]);
+    assertEquals(Byte.MIN_VALUE, array[1]);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testCopyConstructor()
+  {
+    BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1, -1, -1}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+
+    ArrayCompressedBigDecimal d2 = new ArrayCompressedBigDecimal(d);
+    assertEquals(d.getScale(), d2.getScale());
+    assertArrayEquals(d.getArray(), d2.getArray());
+    assertNotSame(d.getArray(), d2.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#wrap(int[], int)}.
+   */
+  @Test
+  public void testWrap()
+  {
+    int[] array = new int[] {Integer.MAX_VALUE, -1};
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(array, 0);
+    assertSame(array, bd.getArray());
+    assertEquals(0, bd.getScale());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#allocate(int, int)}.
+   */
+  @Test
+  public void testAllocate()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 5);
+    assertEquals(5, bd.getScale());
+    assertEquals(2, bd.getArray().length);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testSimpleAccumulate()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 0);
+
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {1, 0}, bd.getArray());
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {2, 0}, bd.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testSimpleAccumulateOverflow()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0x80000000, 0}, 0);
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x7fffffff, 0}, 0);
+    ArrayCompressedBigDecimal add1 = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
+    bd.accumulate(add1);
+    assertArrayEquals(new int[] {0, 1}, bd.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testSimpleAccumulateUnderflow()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 1}, 0);
+
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {-1, -1}, 0);
+
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testUnevenAccumulateUnderflow()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 1}, 0);
+
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {-1}, 0);
+
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test
+  public void testUnevenAccumulateOverflow()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0xffffffff, 1}, 0);
+
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {1}, 0);
+
+    bd.accumulate(add);
+    assertArrayEquals(new int[] {0, 2}, bd.getArray());
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testUnevenAccumulateOverflowWithTruncate()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {Integer.MAX_VALUE}, 0);
+
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {1, 1}, 0);
+
+    bd.accumulate(add);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testAccumulateScaleMismatch()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 1);
+    ArrayCompressedBigDecimal add = new ArrayCompressedBigDecimal(1, 0);
+    bd.accumulate(add);
+  }
+
+  /**
+   * Test method for {@link ArrayCompressedBigDecimal#toBigDecimal()}.
+   */
+  @Test
+  public void testToBigDecimal()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {1}, 0);
+    assertEquals(BigDecimal.ONE, bd.toBigDecimal());
+
+    bd = ArrayCompressedBigDecimal.wrap(new int[] {Integer.MAX_VALUE}, 0);
+    assertEquals(new BigDecimal(Integer.MAX_VALUE), bd.toBigDecimal());
+
+    bd = ArrayCompressedBigDecimal.wrap(new int[] {0}, 0);
+    assertEquals(BigDecimal.ZERO, bd.toBigDecimal());
+    bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 0}, 0);
+    assertEquals(BigDecimal.ZERO, bd.toBigDecimal());
+    bd = new ArrayCompressedBigDecimal(-1, 9);
+    assertEquals(new BigDecimal(-1).scaleByPowerOfTen(-9), bd.toBigDecimal());
+    bd = ArrayCompressedBigDecimal.wrap(new int[] {1410065408, 2}, 9);
+    assertEquals(new BigDecimal(10).setScale(9), bd.toBigDecimal());
+  }
+
+  /**
+   * Test method for {@link ByteBufferCompressedBigDecimal()}.
+   */
+  @Test
+  public void testBigDecimalConstructorwithByteBuffer()
+  {
+    BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    ByteBuffer buf = ByteBuffer.allocate(4);
+    CompressedBigDecimal cbd = new ByteBufferCompressedBigDecimal(buf, 0, d);
+    assertEquals(0, cbd.getScale());
+    assertEquals(8388607, cbd.intValue());
+    assertEquals(new Long(8388607L).doubleValue(), cbd.floatValue(), 0.001);
+    assertEquals(new Long(8388607L).doubleValue(), cbd.doubleValue(), 0.001);
+  }
+
+  /**
+   * Test method for {@link  ArrayCompressedBigDecimal#setArrayEntry
+   */
+  @Test
+  public void testSetArrayEntry()
+  {
+    BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    d.setArrayEntry(0, 2);
+    assertEquals(2, d.intValue());
+  }
+
+  /**
+   * Test method for {@link  ByteBufferCompressedBigDecimal#copyToBuffer(ByteBuffer, int, int, CompressedBigDecimal)}
+   */
+  @Test
+  public void testCopyToBuffer()
+  {
+    ByteBuffer bb = ByteBuffer.wrap(new byte[] {0, 0, 0, 0, 0, 0, 0, 4});
+    ByteBufferCompressedBigDecimal bbdl = new ByteBufferCompressedBigDecimal(bb, 0, 1, 0);
+    bbdl.setArrayEntry(0, 2);
+    assertEquals(2, bbdl.intValue());
+  }
+
+  /**
+   * Test method for {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testUtilsAccumulateByteBuf()
+  {
+    BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    ByteBuffer buf = ByteBuffer.allocate(4);
+    accumulate(buf, 0, 1, 2, new ArrayCompressedBigDecimal(new BigDecimal(Long.MAX_VALUE)));
+  }
+
+  /**
+   * Test method for {@link Utils#accumulate(CompressedBigDecimal, long, int)}
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testUtilsAccumulateCbdWithExeception()
+  {
+    BigDecimal bd = new BigDecimal(new BigInteger("1"));
+    ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
+    accumulate(d, 0L, 1);
+  }
+
+  /**
+   * Test method for {@link Utils#accumulate(CompressedBigDecimal, long, int)}
+   */
+  @Test
+  public void testUtilsAccumulateCbd()
+  {
+    ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 0);
+    ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
+    bd.accumulate(add);
+    accumulate(bd, 1, 0);
+    assertEquals("2", bd.toString());
+    CompressedBigDecimal x = accumulate(bd, new BigDecimal("2"));
+    assertEquals(4, x.intValue());
+
+    CompressedBigDecimalObjectStrategy c1 = new CompressedBigDecimalObjectStrategy();
+    c1.compare(bd, add);
+  }
+
+  /**
+   * Test method for {@link CompressedBigDecimalObjectStrategy
+   */
+  @Test
+  public void testCompressedBigDecimalObjectStrategy()
+  {
+    ArrayCompressedBigDecimal bd;
+    ArrayCompressedBigDecimal acd = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
+    bd = acd;
+    CompressedBigDecimalObjectStrategy c1 = new CompressedBigDecimalObjectStrategy();
+
+    BigDecimal d = new BigDecimal(new BigInteger(1, new byte[] {0, 0, 1}));
+    ByteBuffer bb = ByteBuffer.wrap(new byte[] {0, 0, 0, 0, 0, 0, 0, 4});
+    CompressedBigDecimal cbl = c1.fromByteBuffer(bb, 8);
+    byte[] bf = c1.toBytes(bd);
+    ArrayCompressedBigDecimal cbd = new ArrayCompressedBigDecimal(new BigDecimal(new BigInteger(1, bf)));
+
+    assertEquals(67108864, cbl.intValue());
+    assertEquals(0, c1.compare(bd, acd));
+    assertEquals(0, cbd.intValue());
+  }
+
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java
new file mode 100644
index 0000000000..07feeab28e
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Resources;
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.ResultRow;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for AccumulatingDecimalAggregator.
+ */
+@RunWith(Parameterized.class)
+public class CompressedBigDecimalAggregatorGroupByTest
+{
+  private final AggregationTestHelper helper;
+
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
+
+  /**
+   * Constructor.
+   *
+   * @param config config object
+   */
+  public CompressedBigDecimalAggregatorGroupByTest(GroupByQueryConfig config)
+  {
+    CompressedBigDecimalModule module = new CompressedBigDecimalModule();
+    module.configure(null);
+    helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+        module.getJacksonModules(), config, tempFolder);
+  }
+
+  /**
+   * Constructor feeder.
+   *
+   * @return constructors
+   */
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<?> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+    for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
+      if ("v2ParallelCombine".equals(config.toString())) {
+        continue;
+      }
+      constructors.add(new Object[] {config});
+    }
+    return constructors;
+  }
+
+  /**
+   * Default setup of UTC timezone.
+   */
+  @BeforeClass
+  public static void setupClass()
+  {
+    System.setProperty("user.timezone", "UTC");
+  }
+
+  /**
+   * ingetion method for all groupBy query.
+   *
+   * @throws IOException IOException
+   * @throws Exception   Exception
+   */
+  @Test
+  public void testIngestAndGroupByAllQuery() throws IOException, Exception
+  {
+
+    String groupByQueryJson = Resources.asCharSource(
+        this.getClass().getResource("/" + "bd_test_groupby_query.json"),
+        StandardCharsets.UTF_8
+    ).read();
+
+    Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
+        this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
+        Resources.asCharSource(this.getClass().getResource(
+            "/" + "bd_test_data_parser.json"),
+            StandardCharsets.UTF_8
+        ).read(),
+        Resources.asCharSource(
+            this.getClass().getResource("/" + "bd_test_aggregators.json"),
+            StandardCharsets.UTF_8
+        ).read(),
+        0,
+        Granularities.NONE,
+        5,
+        groupByQueryJson
+    );
+
+    List<ResultRow> results = seq.toList();
+    assertThat(results, hasSize(1));
+    ResultRow row = results.get(0);
+    ObjectMapper mapper = helper.getObjectMapper();
+    GroupByQuery groupByQuery = mapper.readValue(groupByQueryJson, GroupByQuery.class);
+    MapBasedRow mapBasedRow = row.toMapBasedRow(groupByQuery);
+    Map<String, Object> event = mapBasedRow.getEvent();
+    assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))), mapBasedRow.getTimestamp());
+    assertThat(event, aMapWithSize(1));
+    assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
+  }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorTimeseriesTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorTimeseriesTest.java
new file mode 100644
index 0000000000..069919d1d6
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorTimeseriesTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.compressedbigdecimal;
+
+import com.google.common.collect.Iterables;
+import com.google.common.io.Resources;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for AccumulatingDecimalAggregator.
+ */
+public class CompressedBigDecimalAggregatorTimeseriesTest
+{
+  private final AggregationTestHelper helper;
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
+
+  /**
+     * Constructor.
+   * * */
+  public CompressedBigDecimalAggregatorTimeseriesTest()
+  {
+    CompressedBigDecimalModule module = new CompressedBigDecimalModule();
+    module.configure(null);
+    helper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
+            module.getJacksonModules(), tempFolder);
+  }
+
+  /**
+     * Default setup of UTC timezone.
+  */
+  @BeforeClass
+  public static void setupClass()
+  {
+    System.setProperty("user.timezone", "UTC");
+  }
+
+  /**
+   * ingetion method for all timeseries query.
+   *
+   * @throws IOException IOException
+   * @throws Exception   Exception
+   */
+  @Test
+  public void testIngestAndTimeseriesQuery() throws IOException, Exception
+  {
+    Sequence seq = helper.createIndexAndRunQueryOnSegment(
+            this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
+            Resources.asCharSource(getClass().getResource(
+                    "/" + "bd_test_data_parser.json"),
+                    StandardCharsets.UTF_8
+            ).read(),
+                Resources.asCharSource(
+                        this.getClass().getResource("/" + "bd_test_aggregators.json"),
+                        StandardCharsets.UTF_8
+                ).read(),
+                0,
+                Granularities.NONE,
+                5,
+                Resources.asCharSource(
+                        this.getClass().getResource("/" + "bd_test_timeseries_query.json"),
+                        StandardCharsets.UTF_8
+                ).read()
+        );
+
+    TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
+    Map<String, Object> event = result.getBaseObject();
+    assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
+            ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getTimestamp());
+    assertThat(event, aMapWithSize(1));
+    assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
+  }
+
+  /**
+   * Test using multiple segments.
+   *
+   * @throws Exception an exception
+   */
+  @Test
+  public void testIngestMultipleSegmentsAndTimeseriesQuery() throws Exception
+  {
+    File segmentDir1 = tempFolder.newFolder();
+    helper.createIndex(
+            new File(this.getClass().getResource("/" + "bd_test_data.csv").getFile()),
+            Resources.asCharSource(this.getClass().getResource("/" + "bd_test_data_parser.json"),
+                    StandardCharsets.UTF_8).read(),
+            Resources.asCharSource(this.getClass().getResource("/" + "bd_test_aggregators.json"),
+                    StandardCharsets.UTF_8).read(),
+            segmentDir1,
+            0,
+            Granularities.NONE,
+            5);
+    File segmentDir2 = tempFolder.newFolder();
+    helper.createIndex(
+            new File(this.getClass().getResource("/" + "bd_test_zero_data.csv").getFile()),
+            Resources.asCharSource(this.getClass().getResource("/" + "bd_test_data_parser.json"),
+                    StandardCharsets.UTF_8).read(),
+            Resources.asCharSource(this.getClass().getResource("/" + "bd_test_aggregators.json"),
+                    StandardCharsets.UTF_8).read(),
+            segmentDir2,
+            0,
+            Granularities.NONE,
+            5);
+
+    Sequence seq = helper.runQueryOnSegments(
+            Arrays.asList(segmentDir1, segmentDir2),
+            Resources.asCharSource(
+                    this.getClass().getResource("/" + "bd_test_timeseries_query.json"),
+                    StandardCharsets.UTF_8
+            ).read());
+
+    TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
+    Map<String, Object> event = result.getBaseObject();
+    assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
+            ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getTimestamp());
+    assertThat(event, aMapWithSize(1));
+    assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
+
+  }
+}
+
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json
new file mode 100644
index 0000000000..fdbc0d409d
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json
@@ -0,0 +1,9 @@
+[
+    {
+        "type": "compressedBigDecimal",
+        "name": "revenue",
+        "fieldName": "revenue",
+        "scale": 9,
+        "size": 3
+    }
+]
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv
new file mode 100644
index 0000000000..b1fcbc5c59
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv
@@ -0,0 +1,6 @@
+20170101,mail,0.0
+20170101,sports,10.000000000
+20170101,mail,-1.000000000
+20170101,news,9999999999.000000000
+20170101,sports,5000000000.000000005
+20170101,mail,2.0
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json
new file mode 100644
index 0000000000..0671775b18
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json
@@ -0,0 +1,20 @@
+{
+    "type": "string",
+    "parseSpec": {
+        "format": "csv",
+        "timestampSpec": {
+            "column": "timestamp",
+            "format": "yyyyMMdd"
+        },
+        "dimensionsSpec": {
+            "dimensions": [
+                "property"
+            ]
+        },
+        "columns": [
+            "timestamp",
+            "property",
+            "revenue"
+        ]
+    }
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json
new file mode 100644
index 0000000000..cb57405c65
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json
@@ -0,0 +1,19 @@
+{
+    "queryType": "groupBy",
+    "dataSource": "test_datasource",
+    "granularity": "ALL",
+    "dimensions": [
+    ],
+    "aggregations": [
+        {
+            "type": "compressedBigDecimal",
+            "name": "revenue",
+            "fieldName": "revenue",
+            "scale": 9,
+            "size": 3
+        }
+    ],
+    "intervals": [
+        "2017-01-01T00:00:00.000Z/P1D"
+    ]
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_timeseries_query.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_timeseries_query.json
new file mode 100644
index 0000000000..92a1f4457b
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_timeseries_query.json
@@ -0,0 +1,25 @@
+{
+    "queryType": "timeseries",
+    "dataSource": "test_datasource",
+    "granularity": "ALL",
+    "aggregations": [
+        {
+            "type": "compressedBigDecimal",
+            "name": "revenue",
+            "fieldName": "revenue",
+            "scale": 9,
+            "size": 3
+        }
+    ],
+    "filter": {
+        "type": "not",
+        "field": {
+            "type": "selector",
+            "dimension": "property",
+            "value": "XXX"
+        }
+    },
+    "intervals": [
+        "2017-01-01T00:00:00.000Z/P1D"
+    ]
+}
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_zero_data.csv b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_zero_data.csv
new file mode 100644
index 0000000000..eec3f24c15
--- /dev/null
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_zero_data.csv
@@ -0,0 +1 @@
+20170101,XXX,0.0
diff --git a/pom.xml b/pom.xml
index 36e39d543f..ec6091c583 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,6 +186,7 @@
         <module>extensions-core/druid-ranger-security</module>
         <module>extensions-core/testing-tools</module>
         <!-- Community extensions -->
+        <module>extensions-contrib/compressed-bigdecimal</module>
         <module>extensions-contrib/influx-extensions</module>
         <module>extensions-contrib/cassandra-storage</module>
         <module>extensions-contrib/dropwizard-emitter</module>
diff --git a/website/.spelling b/website/.spelling
index c754cf913d..bc1117a849 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1065,6 +1065,7 @@ com.example
 common.runtime.properties
 druid-aws-rds-extensions
 druid-cassandra-storage
+druid-compressed-bigdecimal
 druid-distinctcount
 druid-ec2-extensions
 druid-kafka-extraction-namespace
@@ -2069,6 +2070,15 @@ CDF
 maxStreamLength
 toString
 100TB
+- ../docs/development/extensions-contrib/compressed-big-decimal.md
+compressedBigDecimal
+limitSpec
+metricsSpec
+postAggregations
+SaleAmount
+IngestionSpec
+druid-compressed-bigdecimal
+doubleSum
  - ../docs/querying/sql-functions.md
 ANY_VALUE 
 APPROX_COUNT_DISTINCT_DS_HLL 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org