You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2022/09/14 02:14:45 UTC

[druid] branch master updated: Compressed Big Decimal Cleanup and Extension (#13048)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a2eb7dcc Compressed Big Decimal Cleanup and Extension (#13048)
54a2eb7dcc is described below

commit 54a2eb7dccbd4b395587f71625669c0f4cb3500b
Author: sr <sa...@gmail.com>
AuthorDate: Tue Sep 13 19:14:31 2022 -0700

    Compressed Big Decimal Cleanup and Extension (#13048)
    
    1. remove unnecessary generic type from CompressedBigDecimal
    2. support Number input types
    3. support aggregator reading supported input types directly (uningested
       data)
    4. fix scaling bug in buffer aggregator
---
 .idea/inspectionProfiles/Druid.xml                 | 19 ++---
 .../ArrayCompressedBigDecimal.java                 | 11 ++-
 .../ByteBufferCompressedBigDecimal.java            | 14 ++--
 .../compressedbigdecimal/CompressedBigDecimal.java | 50 +++++++++----
 .../CompressedBigDecimalAggregateCombiner.java     | 23 +++---
 .../CompressedBigDecimalAggregator.java            | 10 +--
 .../CompressedBigDecimalAggregatorFactory.java     | 36 ++++------
 .../CompressedBigDecimalBufferAggregator.java      | 17 +++--
 .../CompressedBigDecimalColumn.java                | 12 ++--
 .../CompressedBigDecimalJsonSerializer.java        |  1 -
 .../CompressedBigDecimalLongColumnSerializer.java  |  6 +-
 .../CompressedBigDecimalMetricSerde.java           | 33 +++------
 .../CompressedBigDecimalObjectStrategy.java        |  6 +-
 .../apache/druid/compressedbigdecimal/Utils.java   | 84 +++++++++++++++-------
 .../AggregatorCombinerFactoryTest.java             | 62 +++++++++-------
 .../CompressedBigDecimalAggregatorGroupByTest.java | 43 +++++++----
 .../src/test/resources/bd_test_aggregators.json    |  2 +-
 .../src/test/resources/bd_test_data.csv            | 12 ++--
 .../src/test/resources/bd_test_data_parser.json    | 15 +++-
 .../src/test/resources/bd_test_groupby_query.json  | 16 ++++-
 20 files changed, 286 insertions(+), 186 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index d1f72dd276..c2e01b0931 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -70,7 +70,6 @@
       </option>
       <option name="IGNORE_FIELDS_USED_IN_MULTIPLE_METHODS" value="true" />
     </inspection_tool>
-    <inspection_tool class="FieldMayBeFinal" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="FinalStaticMethod" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="FlowJSError" enabled="false" level="Non-TeamCity Error" enabled_by_default="false" />
     <inspection_tool class="ForCanBeForeach" enabled="true" level="WARNING" enabled_by_default="true">
@@ -372,15 +371,15 @@
         <constraint name="l" within="" contains="" />
         <constraint name="y" minCount="0" maxCount="2147483647" within="" contains="" />
       </searchConfiguration>
-      <searchConfiguration name="Assign an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="$x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA" >
+      <searchConfiguration name="Assign an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="$x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA">
         <constraint name="__context__" within="" contains="" />
         <constraint name="x" nameOfExprType="java\.util\.concurrent\.Executor" within="" contains="" />
         <constraint name="y" nameOfExprType="java\.util\.concurrent\.ExecutorService" exprTypeWithinHierarchy="true" within="" contains="" />
       </searchConfiguration>
-      <searchConfiguration name="Intialize an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="Executor $x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA" >
+      <searchConfiguration name="Intialize an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="Executor $x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA">
         <constraint name="__context__" within="" contains="" />
-        <constraint name="x"  within="" contains="" />
-        <constraint name="y" nameOfExprType="java\.util\.concurrent\.ExecutorService" exprTypeWithinHierarchy="true"  within="" contains="" />
+        <constraint name="x" within="" contains="" />
+        <constraint name="y" nameOfExprType="java\.util\.concurrent\.ExecutorService" exprTypeWithinHierarchy="true" within="" contains="" />
       </searchConfiguration>
       <searchConfiguration name="Create a simple ExecutorService (not scheduled)" text="$x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA" pattern_context="default">
         <constraint name="__context__" within="" contains="" />
@@ -453,7 +452,7 @@
     <inspection_tool class="UnnecessaryCallToStringValueOf" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="UnnecessaryEnumModifier" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="UnnecessaryFullyQualifiedName" enabled="true" level="WARNING" enabled_by_default="true">
-      <scope name="NonGeneratedFiles" level="ERROR" enabled="true">
+      <scope name="NonGeneratedFiles" level="ERROR" enabled="true" editorAttributes="NOT_USED_ELEMENT_ATTRIBUTES">
         <option name="m_ignoreJavadoc" value="true" />
         <option name="ignoreInModuleStatements" value="true" />
       </scope>
@@ -461,6 +460,10 @@
       <option name="ignoreInModuleStatements" value="true" />
     </inspection_tool>
     <inspection_tool class="UnnecessaryInterfaceModifier" enabled="true" level="ERROR" enabled_by_default="true" />
+    <inspection_tool class="UnnecessaryLocalVariable" enabled="false" level="WARNING" enabled_by_default="false">
+      <option name="m_ignoreImmediatelyReturnedVariables" value="false" />
+      <option name="m_ignoreAnnotatedVariables" value="false" />
+    </inspection_tool>
     <inspection_tool class="UnnecessaryToStringCall" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="UnusedAssignment" enabled="true" level="ERROR" enabled_by_default="true">
       <option name="REPORT_PREFIX_EXPRESSIONS" value="true" />
@@ -480,8 +483,8 @@
     <inspection_tool class="XmlHighlighting" enabled="true" level="Non-TeamCity Warning" enabled_by_default="true" />
     <inspection_tool class="XmlInvalidId" enabled="true" level="Non-TeamCity Error" enabled_by_default="true" />
     <inspection_tool class="XmlPathReference" enabled="true" level="Non-TeamCity Error" enabled_by_default="true" />
-    <inspection_tool class="unused" enabled="true" level="WARNING" enabled_by_default="true" isSelected="false">
-      <scope name="UnusedInspectionsScope" level="ERROR" enabled="true" isSelected="false">
+    <inspection_tool class="unused" enabled="true" level="WARNING" enabled_by_default="true" checkParameterExcludingHierarchy="false" isSelected="false">
+      <scope name="UnusedInspectionsScope" level="ERROR" enabled="true" editorAttributes="NOT_USED_ELEMENT_ATTRIBUTES" checkParameterExcludingHierarchy="false" isSelected="false">
         <option name="LOCAL_VARIABLE" value="true" />
         <option name="FIELD" value="true" />
         <option name="METHOD" value="true" />
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java
index 5f514b5dd4..6168d3fc03 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ArrayCompressedBigDecimal.java
@@ -26,8 +26,7 @@ import java.math.BigInteger;
 /**
  * A compressed big decimal that holds its data with an embedded array.
  */
-@SuppressWarnings("serial")
-public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompressedBigDecimal>
+public class ArrayCompressedBigDecimal extends CompressedBigDecimal
 {
 
   private static final int BYTE_MASK = 0xff;
@@ -85,7 +84,7 @@ public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompres
    *
    * @param initVal the initial value
    */
-  public ArrayCompressedBigDecimal(CompressedBigDecimal<?> initVal)
+  public ArrayCompressedBigDecimal(CompressedBigDecimal initVal)
   {
     super(initVal.getScale());
     this.array = new int[initVal.getArraySize()];
@@ -134,6 +133,12 @@ public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompres
     return new ArrayCompressedBigDecimal(arr, scale);
   }
 
+  @Override
+  public CompressedBigDecimal toHeap()
+  {
+    return this;
+  }
+
   /* (non-Javadoc)
    * @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
    */
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java
index 542c9f92f0..b0727a8f04 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/ByteBufferCompressedBigDecimal.java
@@ -24,8 +24,7 @@ import java.nio.ByteBuffer;
 /**
  * A compressed big decimal that holds its data with an embedded array.
  */
-@SuppressWarnings("serial")
-public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBufferCompressedBigDecimal>
+public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal
 {
 
   private final ByteBuffer buf;
@@ -57,7 +56,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
    * @param position the position in the ByteBuffer
    * @param val      initial value
    */
-  public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal<?> val)
+  public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal val)
   {
     super(val.getScale());
     this.buf = buf;
@@ -67,6 +66,12 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
     copyToBuffer(buf, position, size, val);
   }
 
+  @Override
+  public CompressedBigDecimal toHeap()
+  {
+    return new ArrayCompressedBigDecimal(this);
+  }
+
   /* (non-Javadoc)
    * @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
    */
@@ -100,6 +105,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
     buf.putInt(position + idx * Integer.BYTES, val);
   }
 
+
   /**
    * Copy a compressed big decimal into a Bytebuffer in a format understood by this class.
    *
@@ -108,7 +114,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
    * @param size     The space (in number of ints) allocated for the value
    * @param val      THe value to copy
    */
-  public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal<?> val)
+  public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal val)
   {
     if (val.getArraySize() > size) {
       throw new IllegalArgumentException("Right hand side too big to fit in the result value");
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java
index 12165171df..256a813374 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimal.java
@@ -25,14 +25,10 @@ import java.util.Arrays;
 import java.util.function.ToIntBiFunction;
 
 /**
- * Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory. 
+ * Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory.
  * This helps in revenue based calculations
- *
- * @param <T> Type of actual derived class that contains the underlying data
  */
-@SuppressWarnings("serial")
-public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> extends Number
-    implements Comparable<CompressedBigDecimal<T>>
+public abstract class CompressedBigDecimal extends Number implements Comparable<CompressedBigDecimal>
 {
 
   private static final long INT_MASK = 0x00000000ffffffffL;
@@ -59,11 +55,10 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
    * than this value (the result), then the higher order bits are dropped, similar to
    * what happens when adding a long to an int and storing the result in an int.
    *
-   * @param <S> type of compressedbigdecimal to accumulate
    * @param rhs The object to accumulate
    * @return a reference to <b>this</b>
    */
-  public <S extends CompressedBigDecimal<S>> CompressedBigDecimal<T> accumulate(CompressedBigDecimal<S> rhs)
+  public CompressedBigDecimal accumulate(CompressedBigDecimal rhs)
   {
     if (rhs.scale != scale) {
       throw new IllegalArgumentException("Cannot accumulate MutableBigDecimals with differing scales");
@@ -72,7 +67,8 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
       throw new IllegalArgumentException("Right hand side too big to fit in the result value");
     }
     internalAdd(getArraySize(), this, CompressedBigDecimal::getArrayEntry, CompressedBigDecimal::setArrayEntry,
-        rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
+                rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry
+    );
     return this;
   }
 
@@ -81,7 +77,7 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
    *
    * @return this
    */
-  public CompressedBigDecimal<T> reset()
+  public CompressedBigDecimal reset()
   {
     for (int ii = 0; ii < getArraySize(); ++ii) {
       setArrayEntry(ii, 0);
@@ -105,8 +101,15 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
    * @param rhs    the object containing the right array data
    * @param rhsGet method reference to get an underlying right value
    */
-  static <R, S> void internalAdd(int llen, R lhs, ToIntBiFunction<R, Integer> lhsGet, ObjBiIntConsumer<R> lhsSet,
-                                 int rlen, S rhs, ToIntBiFunction<S, Integer> rhsGet)
+  static <R, S> void internalAdd(
+      int llen,
+      R lhs,
+      ToIntBiFunction<R, Integer> lhsGet,
+      ObjBiIntConsumer<R> lhsSet,
+      int rlen,
+      S rhs,
+      ToIntBiFunction<S, Integer> rhsGet
+  )
   {
     int commonLen = Integer.min(llen, rlen);
     long carry = 0;
@@ -179,6 +182,11 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
     return scale;
   }
 
+  /**
+   * @return a version of this object that is on heap. Returns this if already on-heap
+   */
+  public abstract CompressedBigDecimal toHeap();
+
   /**
    * Return the array size.
    *
@@ -239,6 +247,7 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
    * -1 if Negative
    * 0 if Zero
    * 1 if Positive
+   *
    * @param <S>     type of object containing the array
    * @param size    the underlying array size
    * @param rhs     object that contains the underlying array
@@ -266,15 +275,26 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
    * @see java.lang.Comparable#compareTo(java.lang.Object)
    */
   @Override
-  public int compareTo(CompressedBigDecimal<T> o)
+  public int compareTo(CompressedBigDecimal o)
   {
-
-    if (this.equals(o)) {
+    if (super.equals(o)) {
       return 0;
     }
     return this.toBigDecimal().compareTo(o.toBigDecimal());
   }
 
+  @Override
+  public int hashCode()
+  {
+    return toBigDecimal().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    return obj instanceof CompressedBigDecimal && toBigDecimal().equals(((CompressedBigDecimal) obj).toBigDecimal());
+  }
+
   /**
    * Returns the value of the specified number as an {@code int},
    * which may involve rounding or truncation.
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java
index 05d0451c0d..110c73cb42 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregateCombiner.java
@@ -28,18 +28,18 @@ import javax.annotation.Nullable;
 /**
  * AggregateCombiner for CompressedBigDecimals.
  */
-public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal<?>>
+public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal>
 {
-  private CompressedBigDecimal<?> sum;
+  private CompressedBigDecimal sum;
 
   @Override
   public void reset(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
   {
     @SuppressWarnings("unchecked")
-    ColumnValueSelector<CompressedBigDecimal<?>> selector =
-        (ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
+    ColumnValueSelector<CompressedBigDecimal> selector =
+        (ColumnValueSelector<CompressedBigDecimal>) columnValueSelector;
 
-    CompressedBigDecimal<?> cbd = selector.getObject();
+    CompressedBigDecimal cbd = selector.getObject();
     if (sum == null) {
       sum = new ArrayCompressedBigDecimal(cbd);
     } else {
@@ -52,10 +52,10 @@ public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<
   public void fold(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
   {
     @SuppressWarnings("unchecked")
-    ColumnValueSelector<CompressedBigDecimal<?>> selector =
-        (ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
+    ColumnValueSelector<CompressedBigDecimal> selector =
+        (ColumnValueSelector<CompressedBigDecimal>) columnValueSelector;
 
-    CompressedBigDecimal<?> cbd = selector.getObject();
+    CompressedBigDecimal cbd = selector.getObject();
 
     if (sum == null) {
       sum = new ArrayCompressedBigDecimal(cbd);
@@ -86,15 +86,14 @@ public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<
 
   @Nullable
   @Override
-  public CompressedBigDecimal<?> getObject()
+  public CompressedBigDecimal getObject()
   {
     return sum;
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public Class<CompressedBigDecimal<?>> classOfObject()
+  public Class<CompressedBigDecimal> classOfObject()
   {
-    return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+    return CompressedBigDecimal.class;
   }
 }
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java
index 591131492f..bd6e148fd8 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregator.java
@@ -28,8 +28,8 @@ import org.apache.druid.segment.ColumnValueSelector;
 public class CompressedBigDecimalAggregator implements Aggregator
 {
 
-  private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
-  private final CompressedBigDecimal<?> sum;
+  private final ColumnValueSelector<CompressedBigDecimal> selector;
+  private final CompressedBigDecimal sum;
 
   /**
    * Constructor.
@@ -41,7 +41,7 @@ public class CompressedBigDecimalAggregator implements Aggregator
   public CompressedBigDecimalAggregator(
       int size,
       int scale,
-      ColumnValueSelector<CompressedBigDecimal<?>> selector
+      ColumnValueSelector<CompressedBigDecimal> selector
   )
   {
     this.selector = selector;
@@ -54,10 +54,10 @@ public class CompressedBigDecimalAggregator implements Aggregator
   @Override
   public void aggregate()
   {
-    CompressedBigDecimal<?> selectedObject = selector.getObject();
+    CompressedBigDecimal selectedObject = Utils.objToCompressedBigDecimal(selector.getObject());
     if (selectedObject != null) {
       if (selectedObject.getScale() != sum.getScale()) {
-        selectedObject = Utils.scaleUp(selectedObject);
+        selectedObject = Utils.scaleUp(selectedObject, sum.getScale());
       }
       sum.accumulate(selectedObject);
     }
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java
index a75f31e1b2..418f4c8791 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorFactory.java
@@ -44,22 +44,14 @@ import java.util.List;
  * An aggregator factory to generate longSum aggregator object.
  */
 public class CompressedBigDecimalAggregatorFactory
-    extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal<?>>>
+    extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal>>
 {
 
   public static final int DEFAULT_SCALE = 9;
   public static final int DEFAULT_SIZE = 3;
   private static final byte CACHE_TYPE_ID = 0x37;
 
-  public static final Comparator<CompressedBigDecimal<?>> COMPARATOR = new Comparator<CompressedBigDecimal<?>>()
-  {
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public int compare(CompressedBigDecimal lhs, CompressedBigDecimal rhs)
-    {
-      return lhs.compareTo(rhs);
-    }
-  };
+  public static final Comparator<CompressedBigDecimal> COMPARATOR = CompressedBigDecimal::compareTo;
 
   private final String name;
   private final String fieldName;
@@ -90,21 +82,21 @@ public class CompressedBigDecimalAggregatorFactory
 
   @SuppressWarnings("unchecked")
   @Override
-  protected ColumnValueSelector<CompressedBigDecimal<?>> selector(ColumnSelectorFactory metricFactory)
+  protected ColumnValueSelector<CompressedBigDecimal> selector(ColumnSelectorFactory metricFactory)
   {
-    return (ColumnValueSelector<CompressedBigDecimal<?>>) metricFactory.makeColumnValueSelector(fieldName);
+    return (ColumnValueSelector<CompressedBigDecimal>) metricFactory.makeColumnValueSelector(fieldName);
   }
 
   @Override
   protected Aggregator factorize(ColumnSelectorFactory metricFactory,
-                                 @Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
+                                 @Nonnull ColumnValueSelector<CompressedBigDecimal> selector)
   {
     return new CompressedBigDecimalAggregator(size, scale, selector);
   }
 
   @Override
   protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory,
-                                               @Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
+                                               @Nonnull ColumnValueSelector<CompressedBigDecimal> selector)
   {
     return new CompressedBigDecimalBufferAggregator(size, scale, selector);
   }
@@ -113,7 +105,7 @@ public class CompressedBigDecimalAggregatorFactory
    * @see org.apache.druid.query.aggregation.AggregatorFactory#getComparator()
    */
   @Override
-  public Comparator<CompressedBigDecimal<?>> getComparator()
+  public Comparator<CompressedBigDecimal> getComparator()
   {
     return COMPARATOR;
   }
@@ -137,9 +129,9 @@ public class CompressedBigDecimalAggregatorFactory
       // due to truncation when the deserialized objects aren't big enough to hold the accumlated result.
       // The most common case this avoids is deserializing 0E-9 into a CompressedBigDecimal with array
       // size 1 and then accumulating a larger value into it.
-      CompressedBigDecimal<?> retVal = ArrayCompressedBigDecimal.allocate(size, scale);
-      CompressedBigDecimal<?> left = (CompressedBigDecimal<?>) lhs;
-      CompressedBigDecimal<?> right = (CompressedBigDecimal<?>) rhs;
+      CompressedBigDecimal retVal = ArrayCompressedBigDecimal.allocate(size, scale);
+      CompressedBigDecimal left = (CompressedBigDecimal) lhs;
+      CompressedBigDecimal right = (CompressedBigDecimal) rhs;
       if (left.signum() != 0) {
         retVal.accumulate(left);
       }
@@ -160,7 +152,7 @@ public class CompressedBigDecimalAggregatorFactory
   }
 
   @Override
-  public AggregateCombiner<CompressedBigDecimal<?>> makeAggregateCombiner()
+  public AggregateCombiner<CompressedBigDecimal> makeAggregateCombiner()
   {
     return new CompressedBigDecimalAggregateCombiner();
   }
@@ -214,7 +206,7 @@ public class CompressedBigDecimalAggregatorFactory
   {
     return ValueType.COMPLEX;
   }
-  
+
   /* (non-Javadoc)
    * @see org.apache.druid.query.aggregation.AggregatorFactory#getTypeName()
    */
@@ -223,7 +215,7 @@ public class CompressedBigDecimalAggregatorFactory
   {
     return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
   }
- 
+
   /* (non-Javadoc)
    * @see org.apache.druid.query.aggregation.AggregatorFactory#getCacheKey()
    */
@@ -240,7 +232,7 @@ public class CompressedBigDecimalAggregatorFactory
   @Override
   public Object finalizeComputation(Object object)
   {
-    CompressedBigDecimal<?> compressedBigDecimal = (CompressedBigDecimal<?>) object;
+    CompressedBigDecimal compressedBigDecimal = (CompressedBigDecimal) object;
     BigDecimal bigDecimal = compressedBigDecimal.toBigDecimal();
     return bigDecimal.compareTo(BigDecimal.ZERO) == 0 ? 0 : bigDecimal;
   }
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java
index f5f2de44a3..2c4acd612e 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalBufferAggregator.java
@@ -32,7 +32,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
 
   //Cache will hold the aggregated value.
   //We are using ByteBuffer to hold the key to the aggregated value.
-  private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
+  private final ColumnValueSelector<CompressedBigDecimal> selector;
   private final int size;
   private final int scale;
 
@@ -46,7 +46,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
   public CompressedBigDecimalBufferAggregator(
       int size,
       int scale,
-      ColumnValueSelector<CompressedBigDecimal<?>> selector
+      ColumnValueSelector<CompressedBigDecimal> selector
   )
   {
     this.selector = selector;
@@ -71,7 +71,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
-    CompressedBigDecimal<?> addend = selector.getObject();
+    CompressedBigDecimal addend = Utils.objToCompressedBigDecimal(selector.getObject());
     if (addend != null) {
       Utils.accumulate(buf, position, size, scale, addend);
     }
@@ -83,7 +83,16 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
   @Override
   public Object get(ByteBuffer buf, int position)
   {
-    return new ByteBufferCompressedBigDecimal(buf, position, size, scale);
+    ByteBufferCompressedBigDecimal byteBufferCompressedBigDecimal = new ByteBufferCompressedBigDecimal(
+        buf,
+        position,
+        size,
+        scale
+    );
+
+    CompressedBigDecimal heapCompressedBigDecimal = byteBufferCompressedBigDecimal.toHeap();
+
+    return heapCompressedBigDecimal;
   }
 
   /* (non-Javadoc)
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
index 52833e0556..b6d7d029be 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
@@ -68,7 +68,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
   }
 
   @Override
-  public CompressedBigDecimal<?> getRowValue(int rowNum)
+  public CompressedBigDecimal getRowValue(int rowNum)
   {
     int s = scale.get(rowNum);
 
@@ -91,20 +91,20 @@ public class CompressedBigDecimalColumn implements ComplexColumn
   }
 
   @Override
-  public ColumnValueSelector<?> makeColumnValueSelector(final ReadableOffset offset)
+  public ColumnValueSelector makeColumnValueSelector(final ReadableOffset offset)
   {
-    return new ObjectColumnSelector<CompressedBigDecimal<?>>()
+    return new ObjectColumnSelector<CompressedBigDecimal>()
     {
       @Override @Nullable
-      public CompressedBigDecimal<?> getObject()
+      public CompressedBigDecimal getObject()
       {
         return getRowValue(offset.getOffset());
       }
 
       @Override @SuppressWarnings("unchecked")
-      public Class<CompressedBigDecimal<?>> classOfObject()
+      public Class<CompressedBigDecimal> classOfObject()
       {
-        return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+        return (Class<CompressedBigDecimal>) (Class) CompressedBigDecimal.class;
       }
 
       @Override
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java
index 49b3ed23da..4d9cb92aa8 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalJsonSerializer.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 /**
  * CompressedBigDecimal json serializer.
  */
-@SuppressWarnings("rawtypes")
 public class CompressedBigDecimalJsonSerializer extends JsonSerializer<CompressedBigDecimal>
 {
 
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
index e31f845e0f..e6b6539fe3 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
@@ -36,7 +36,7 @@ import java.util.Locale;
 /**
  * Column Serializer that understands converting CompressedBigDecimal to 4 byte long values for better storage.
  */
-public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal<?>>
+public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal>
 {
   private static final byte VERSION = CompressedBigDecimalColumnPartSupplier.VERSION;
 
@@ -92,9 +92,9 @@ public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSe
   }
 
   @Override
-  public void serialize(ColumnValueSelector<? extends CompressedBigDecimal<?>> obj) throws IOException
+  public void serialize(ColumnValueSelector<? extends CompressedBigDecimal> obj) throws IOException
   {
-    CompressedBigDecimal<?> abd = obj.getObject();
+    CompressedBigDecimal abd = obj.getObject();
     int[] array = new int[abd.getArraySize()];
     for (int ii = 0; ii < abd.getArraySize(); ++ii) {
       array[ii] = abd.getArrayEntry(ii);
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java
index d1193111c9..91f2f1a348 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalMetricSerde.java
@@ -20,14 +20,12 @@
 package org.apache.druid.compressedbigdecimal;
 
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.segment.column.ColumnBuilder;
 import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.serde.ComplexMetricExtractor;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 /**
@@ -36,7 +34,7 @@ import java.nio.ByteBuffer;
 public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
 {
 
-  private CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
+  private final CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
 
   /* (non-Javadoc)
    * @see ComplexMetricSerde#getTypeName()
@@ -48,32 +46,22 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
   }
 
   @Override
-  public ComplexMetricExtractor<CompressedBigDecimal<?>> getExtractor()
+  public ComplexMetricExtractor<CompressedBigDecimal> getExtractor()
   {
-    return new ComplexMetricExtractor<CompressedBigDecimal<?>>()
+    return new ComplexMetricExtractor<CompressedBigDecimal>()
     {
-      @SuppressWarnings("unchecked")
       @Override
-      public Class<CompressedBigDecimal<?>> extractedClass()
+      public Class<CompressedBigDecimal> extractedClass()
       {
-        return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
+        return CompressedBigDecimal.class;
       }
 
       @Override
-      public CompressedBigDecimal<?> extractValue(InputRow inputRow, String metricName)
+      public CompressedBigDecimal extractValue(InputRow inputRow, String metricName)
       {
         Object rawMetric = inputRow.getRaw(metricName);
-        if (rawMetric == null) {
-          return null;
-        } else if (rawMetric instanceof BigDecimal) {
-          return new ArrayCompressedBigDecimal((BigDecimal) rawMetric);
-        } else if (rawMetric instanceof String) {
-          return new ArrayCompressedBigDecimal(new BigDecimal((String) rawMetric));
-        } else if (rawMetric instanceof CompressedBigDecimal<?>) {
-          return (CompressedBigDecimal<?>) rawMetric;
-        } else {
-          throw new ISE("Unknown extraction value type: [%s]", rawMetric.getClass().getSimpleName());
-        }
+
+        return Utils.objToCompressedBigDecimal(rawMetric);
       }
     };
   }
@@ -95,7 +83,8 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
   @Override
   public CompressedBigDecimalLongColumnSerializer getSerializer(
       SegmentWriteOutMedium segmentWriteOutMedium,
-      String column)
+      String column
+  )
   {
     return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);
   }
@@ -104,7 +93,7 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
    * @see ComplexMetricSerde#getObjectStrategy()
    */
   @Override
-  public ObjectStrategy<CompressedBigDecimal<?>> getObjectStrategy()
+  public ObjectStrategy<CompressedBigDecimal> getObjectStrategy()
   {
     return strategy;
   }
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java
index 8ef284b82b..829c1f94d5 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalObjectStrategy.java
@@ -28,7 +28,7 @@ import java.nio.IntBuffer;
 /**
  * Defines strategy on how to read and write data from deep storage.
  */
-public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal<?>>
+public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal>
 {
 
   /*
@@ -61,7 +61,7 @@ public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<Compre
    * @see org.apache.druid.segment.data.ObjectStrategy#fromByteBuffer(java.nio.ByteBuffer, int)
    */
   @Override
-  public CompressedBigDecimal<?> fromByteBuffer(ByteBuffer buffer, int numBytes)
+  public CompressedBigDecimal fromByteBuffer(ByteBuffer buffer, int numBytes)
   {
     ByteBuffer myBuf = buffer.slice();
     myBuf.order(ByteOrder.LITTLE_ENDIAN);
@@ -79,7 +79,7 @@ public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<Compre
    * @see org.apache.druid.segment.data.ObjectStrategy#toBytes(java.lang.Object)
    */
   @Override
-  public byte[] toBytes(CompressedBigDecimal<?> val)
+  public byte[] toBytes(CompressedBigDecimal val)
   {
     ByteBuffer buf = ByteBuffer.allocate(4 * (val.getArraySize() + 1));
     buf.order(ByteOrder.LITTLE_ENDIAN);
diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java
index 2e86504c56..c38b3a3105 100644
--- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java
+++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/Utils.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.compressedbigdecimal;
 
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.segment.data.IndexedInts;
 
 import java.math.BigDecimal;
@@ -37,15 +38,13 @@ public class Utils
    * than this value (the result), then the higher order bits are dropped, similar to
    * what happens when adding a long to an int and storing the result in an int.
    *
-   * @param <S> Type of CompressedBigDecimal into which to accumulate
    * @param lhs The object into which to accumulate
    * @param rhs The object to accumulate
    * @return a reference to <b>this</b>
    */
-  public static <S extends CompressedBigDecimal<S>>
-      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, BigDecimal rhs)
+  public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, BigDecimal rhs)
   {
-    CompressedBigDecimal<ArrayCompressedBigDecimal> abd =
+    CompressedBigDecimal abd =
         new ArrayCompressedBigDecimal(rhs.setScale(lhs.getScale()));
     return lhs.accumulate(abd);
   }
@@ -58,36 +57,33 @@ public class Utils
    * than this value (the result), then the higher order bits are dropped, similar to
    * what happens when adding a long to an int and storing the result in an int.
    *
-   * @param <S>      Type of CompressedBigDecimal into which to accumulate
    * @param lhs      The object into which to accumulate
    * @param rhs      The object to accumulate
    * @param rhsScale The scale to apply to the long being accumulated
    * @return a reference to <b>this</b>
    */
-  public static <S extends CompressedBigDecimal<S>>
-      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, long rhs, int rhsScale)
+  public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, long rhs, int rhsScale)
   {
-    CompressedBigDecimal<ArrayCompressedBigDecimal> abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
+    CompressedBigDecimal abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
     return lhs.accumulate(abd);
   }
 
   /**
    * Accumulate using IndexedInts read from Druid's segment file.
    *
-   * @param <S>      Type of CompressedBigDecimal into which to accumulate
    * @param lhs      The object into which to accumulate
    * @param rhs      IndexedInts representing array of magnitude values
    * @param rhsScale the scale
    * @return a reference to <b>this</b>
    */
-  public static <S extends CompressedBigDecimal<S>>
-      CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, IndexedInts rhs, int rhsScale)
+  public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, IndexedInts rhs, int rhsScale)
   {
     if (rhs.size() > lhs.getArraySize()) {
       throw new IllegalArgumentException("Right hand side too big to fit in the result value");
     }
     CompressedBigDecimal.internalAdd(lhs.getArraySize(), lhs, CompressedBigDecimal::getArrayEntry,
-        CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get);
+                                     CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get
+    );
     return lhs;
   }
 
@@ -100,45 +96,79 @@ public class Utils
    * @param lhsScale The scale of the left
    * @param rhs      the right side to accumlate
    */
-  public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal<?> rhs)
+  public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal rhs)
   {
     if (rhs.getArraySize() > lhsSize) {
       throw new IllegalArgumentException("Right hand side too big to fit in the result value");
     }
     BufferAccessor accessor = BufferAccessor.prepare(pos);
-    CompressedBigDecimal.internalAdd(lhsSize, buf, accessor, accessor,
-        rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
+    if (rhs.getScale() != lhsScale) {
+      rhs = Utils.scaleUp(rhs);
+    }
+    CompressedBigDecimal.internalAdd(
+        lhsSize,
+        buf,
+        accessor,
+        accessor,
+        rhs.getArraySize(),
+        rhs,
+        CompressedBigDecimal::getArrayEntry
+    );
   }
 
   /**
    * Returns a {@code CompressedBigDecimal} whose scale is moderated as per the default scale.
    *
-   * @param <S> Type of CompressedBigDecimal to scale
    * @param val The value to scale up
    * @return Scaled up compressedBigDecimal
    */
-  public static <S extends CompressedBigDecimal<S>>
-      CompressedBigDecimal<ArrayCompressedBigDecimal> scaleUp(CompressedBigDecimal<S> val)
+  public static CompressedBigDecimal scaleUp(CompressedBigDecimal val)
   {
     return new ArrayCompressedBigDecimal(
         val.toBigDecimal().setScale(CompressedBigDecimalAggregatorFactory.DEFAULT_SCALE, BigDecimal.ROUND_UP)
     );
   }
 
+  public static CompressedBigDecimal scaleUp(CompressedBigDecimal val, int scale)
+  {
+    return new ArrayCompressedBigDecimal(
+        val.toBigDecimal().setScale(scale, BigDecimal.ROUND_UP)
+    );
+  }
+
+  public static CompressedBigDecimal objToCompressedBigDecimal(Object obj)
+  {
+    CompressedBigDecimal result;
+    if (obj == null) {
+      result = null;
+    } else if (obj instanceof BigDecimal) {
+      result = new ArrayCompressedBigDecimal((BigDecimal) obj);
+    } else if (obj instanceof Long) {
+      result = new ArrayCompressedBigDecimal(new BigDecimal((Long) obj));
+    } else if (obj instanceof Integer) {
+      result = new ArrayCompressedBigDecimal(new BigDecimal((Integer) obj));
+    } else if (obj instanceof Double) {
+      result = new ArrayCompressedBigDecimal(BigDecimal.valueOf((Double) obj));
+    } else if (obj instanceof Float) {
+      result = new ArrayCompressedBigDecimal(BigDecimal.valueOf((Float) obj));
+    } else if (obj instanceof String) {
+      result = new ArrayCompressedBigDecimal(new BigDecimal((String) obj));
+    } else if (obj instanceof CompressedBigDecimal) {
+      result = (CompressedBigDecimal) obj;
+    } else {
+      throw new ISE("Unknown extraction value type: [%s]", obj.getClass().getSimpleName());
+    }
+
+    return result;
+  }
+
   /**
    * Helper class that maintains a cache of thread local objects that can be used to access
    * a ByteBuffer in {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}.
    */
   private static class BufferAccessor implements ToIntBiFunction<ByteBuffer, Integer>, ObjBiIntConsumer<ByteBuffer>
   {
-    private static ThreadLocal<BufferAccessor> cache = new ThreadLocal<BufferAccessor>()
-    {
-      @Override
-      protected BufferAccessor initialValue()
-      {
-        return new BufferAccessor();
-      }
-    };
+    private static final ThreadLocal<BufferAccessor> CACHE = ThreadLocal.withInitial(BufferAccessor::new);
 
     private int position = 0;
 
@@ -150,7 +180,7 @@ public class Utils
      */
     public static BufferAccessor prepare(int position)
     {
-      BufferAccessor accessor = cache.get();
+      BufferAccessor accessor = CACHE.get();
       accessor.position = position;
       return accessor;
     }
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java
index 407e59dfaf..6141da91a5 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/AggregatorCombinerFactoryTest.java
@@ -24,15 +24,13 @@ import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.ColumnarMultiInts;
 import org.apache.druid.segment.data.ReadableOffset;
 import org.easymock.EasyMock;
+import org.junit.Assert;
 import org.junit.Test;
+
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 
 public class AggregatorCombinerFactoryTest
 {
@@ -46,10 +44,10 @@ public class AggregatorCombinerFactoryTest
     ColumnarInts ci = EasyMock.createMock(ColumnarInts.class);
     ReadableOffset ro = EasyMock.createMock(ReadableOffset.class);
     CompressedBigDecimalColumn cbr = new CompressedBigDecimalColumn(ci, cmi);
-    assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
-    assertEquals(0, cbr.getLength());
-    assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
-    assertNotNull(cbr.makeColumnValueSelector(ro));
+    Assert.assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
+    Assert.assertEquals(0, cbr.getLength());
+    Assert.assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
+    Assert.assertNotNull(cbr.makeColumnValueSelector(ro));
   }
 
   /**
@@ -59,23 +57,35 @@ public class AggregatorCombinerFactoryTest
   public void testCompressedBigDecimalAggregatorFactory()
   {
     CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
-    assertEquals("CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}", cf.toString());
-    assertNotNull(cf.getCacheKey());
-    assertNull(cf.deserialize(null));
-    assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
-    assertEquals("5", cf.deserialize(5d).toString());
-    assertEquals("5", cf.deserialize("5").toString());
-    assertEquals("[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]", Arrays.toString(cf.getRequiredColumns().toArray()));
-    assertEquals("0", cf.combine(null, null).toString());
-    assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
-    assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
-    assertEquals("8", cf.combine(new ArrayCompressedBigDecimal(new BigDecimal(4)), new ArrayCompressedBigDecimal(new BigDecimal(4))).toString());
+    Assert.assertEquals(
+        "CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}",
+        cf.toString()
+    );
+    Assert.assertNotNull(cf.getCacheKey());
+    Assert.assertNull(cf.deserialize(null));
+    Assert.assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
+    Assert.assertEquals("5", cf.deserialize(5d).toString());
+    Assert.assertEquals("5", cf.deserialize("5").toString());
+    Assert.assertEquals(
+        "[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]",
+        Arrays.toString(cf.getRequiredColumns().toArray())
+    );
+    Assert.assertEquals("0", cf.combine(null, null).toString());
+    Assert.assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
+    Assert.assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
+    Assert.assertEquals(
+        "8",
+        cf.combine(
+            new ArrayCompressedBigDecimal(new BigDecimal(4)),
+            new ArrayCompressedBigDecimal(new BigDecimal(4))
+        ).toString()
+    );
   }
 
   /**
    * Test method for {@link CompressedBigDecimalAggregatorFactory#deserialize(Object)}.
    */
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testCompressedBigDecimalAggregatorFactoryDeserialize()
   {
     CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
@@ -85,10 +95,10 @@ public class AggregatorCombinerFactoryTest
   /**
    * Test method for {@link CompressedBigDecimalBufferAggregator#getFloat(ByteBuffer, int)}
    */
-  @Test (expected = UnsupportedOperationException.class)
+  @Test(expected = UnsupportedOperationException.class)
   public void testCompressedBigDecimalBufferAggregatorGetFloat()
   {
-    ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
+    ColumnValueSelector<CompressedBigDecimal> cs = EasyMock.createMock(ColumnValueSelector.class);
     ByteBuffer bbuf = ByteBuffer.allocate(10);
     CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
     ca.getFloat(bbuf, 0);
@@ -97,10 +107,10 @@ public class AggregatorCombinerFactoryTest
   /**
    * Test method for {@link CompressedBigDecimalBufferAggregator#getLong(ByteBuffer, int)}
    */
-  @Test (expected = UnsupportedOperationException.class)
+  @Test(expected = UnsupportedOperationException.class)
   public void testCompressedBigDecimalBufferAggregatorGetLong()
   {
-    ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
+    ColumnValueSelector<CompressedBigDecimal> cs = EasyMock.createMock(ColumnValueSelector.class);
     ByteBuffer bbuf = ByteBuffer.allocate(10);
     CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
     ca.getLong(bbuf, 0);
@@ -114,7 +124,7 @@ public class AggregatorCombinerFactoryTest
   {
     CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
     CompressedBigDecimal c = cc.getObject();
-    assertSame(null, c);
+    Assert.assertSame(null, c);
   }
 
   /**
@@ -124,7 +134,7 @@ public class AggregatorCombinerFactoryTest
   public void testCompressedBigDecimalAggregateCombinerClassofObject()
   {
     CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
-    assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
+    Assert.assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
   }
 
   /**
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java
index 07feeab28e..500429bb97 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java
+++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalAggregatorGroupByTest.java
@@ -29,8 +29,12 @@ import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.ResultRow;
+import org.hamcrest.collection.IsCollectionWithSize;
+import org.hamcrest.collection.IsMapContaining;
+import org.hamcrest.collection.IsMapWithSize;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,11 +52,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
-import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
-import static org.hamcrest.collection.IsMapContaining.hasEntry;
-import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 /**
  * Unit tests for AccumulatingDecimalAggregator.
@@ -88,10 +87,7 @@ public class CompressedBigDecimalAggregatorGroupByTest
   {
     final List<Object[]> constructors = new ArrayList<>();
     for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
-      if ("v2ParallelCombine".equals(config.toString())) {
-        continue;
-      }
-      constructors.add(new Object[] {config});
+      constructors.add(new Object[]{config});
     }
     return constructors;
   }
@@ -122,8 +118,9 @@ public class CompressedBigDecimalAggregatorGroupByTest
 
     Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
         this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
-        Resources.asCharSource(this.getClass().getResource(
-            "/" + "bd_test_data_parser.json"),
+        Resources.asCharSource(
+            this.getClass().getResource(
+                "/" + "bd_test_data_parser.json"),
             StandardCharsets.UTF_8
         ).read(),
         Resources.asCharSource(
@@ -137,14 +134,30 @@ public class CompressedBigDecimalAggregatorGroupByTest
     );
 
     List<ResultRow> results = seq.toList();
-    assertThat(results, hasSize(1));
+    Assert.assertThat(results, IsCollectionWithSize.hasSize(1));
     ResultRow row = results.get(0);
     ObjectMapper mapper = helper.getObjectMapper();
     GroupByQuery groupByQuery = mapper.readValue(groupByQueryJson, GroupByQuery.class);
     MapBasedRow mapBasedRow = row.toMapBasedRow(groupByQuery);
     Map<String, Object> event = mapBasedRow.getEvent();
-    assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))), mapBasedRow.getTimestamp());
-    assertThat(event, aMapWithSize(1));
-    assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
+    Assert.assertEquals(
+        new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
+        mapBasedRow.getTimestamp()
+    );
+    Assert.assertThat(event, IsMapWithSize.aMapWithSize(3));
+    Assert.assertThat(
+        event,
+        IsMapContaining.hasEntry("cbdRevenueFromString", new BigDecimal("15000000010.000000005"))
+    );
+    // long conversion of 5000000000.000000005 results in null/0 value
+    Assert.assertThat(
+        event,
+        IsMapContaining.hasEntry("cbdRevenueFromLong", new BigDecimal("10000000010.000000000"))
+    );
+    // double input changes 5000000000.000000005 to 5000000000.5 to fit in double mantissa space
+    Assert.assertThat(
+        event,
+        IsMapContaining.hasEntry("cbdRevenueFromDouble", new BigDecimal("15000000010.500000000"))
+    );
   }
 }
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json
index fdbc0d409d..9db4490968 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_aggregators.json
@@ -1,7 +1,7 @@
 [
     {
         "type": "compressedBigDecimal",
-        "name": "revenue",
+        "name": "bigDecimalRevenue",
         "fieldName": "revenue",
         "scale": 9,
         "size": 3
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv
index b1fcbc5c59..418f166b5a 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data.csv
@@ -1,6 +1,6 @@
-20170101,mail,0.0
-20170101,sports,10.000000000
-20170101,mail,-1.000000000
-20170101,news,9999999999.000000000
-20170101,sports,5000000000.000000005
-20170101,mail,2.0
+20170101,mail,0.0,0.0,0.0
+20170101,sports,10.000000000,10.000000000,10.000000000
+20170101,mail,-1.000000000,-1.000000000,-1.000000000
+20170101,news,9999999999.000000000,9999999999.000000000,9999999999.000000000
+20170101,sports,5000000000.000000005,5000000000.000000005,5000000000.5
+20170101,mail,2.0,2.0,2.0
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json
index 0671775b18..c1a6e24dfc 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_data_parser.json
@@ -8,13 +8,24 @@
         },
         "dimensionsSpec": {
             "dimensions": [
-                "property"
+                "property",
+                "revenue",
+                {
+                    "type": "long",
+                    "name": "longRevenue"
+                },
+                {
+                    "type": "double",
+                    "name": "doubleRevenue"
+                }
             ]
         },
         "columns": [
             "timestamp",
             "property",
-            "revenue"
+            "revenue",
+            "longRevenue",
+            "doubleRevenue"
         ]
     }
 }
diff --git a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json
index cb57405c65..471167e8dd 100644
--- a/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json
+++ b/extensions-contrib/compressed-bigdecimal/src/test/resources/bd_test_groupby_query.json
@@ -7,10 +7,24 @@
     "aggregations": [
         {
             "type": "compressedBigDecimal",
-            "name": "revenue",
+            "name": "cbdRevenueFromString",
             "fieldName": "revenue",
             "scale": 9,
             "size": 3
+        },
+        {
+            "type": "compressedBigDecimal",
+            "name": "cbdRevenueFromLong",
+            "fieldName": "longRevenue",
+            "scale": 9,
+            "size": 3
+        },
+        {
+            "type": "compressedBigDecimal",
+            "name": "cbdRevenueFromDouble",
+            "fieldName": "doubleRevenue",
+            "scale": 9,
+            "size": 3
         }
     ],
     "intervals": [


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org