You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/30 03:12:32 UTC

[pinot] branch master updated: Support single-valued BigDecimal in schema, type conversion, SQL statements and minimum set of transforms. (#8503)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d282e45523 Support single-valued BigDecimal in schema, type conversion, SQL statements and minimum set of transforms. (#8503)
d282e45523 is described below

commit d282e455234ea3964d24235c5e45ec4d4d275a4f
Author: nizarhejazi <96...@users.noreply.github.com>
AuthorDate: Fri Apr 29 20:12:27 2022 -0700

    Support single-valued BigDecimal in schema, type conversion, SQL statements and minimum set of transforms. (#8503)
    
    - Support single-valued BigDecimal in schema, and type conversion so that data can be ingested as `BIG_DECIMAL`.
    - Support BigDecimal at both data access layer and data storage layer, Dictionary and ForwardIndexReader.
    - Support `BIG_DECIMAL` in SQL statements:
       - SELECT
       - ORDER BY
       - GROUP BY
       - DISTINCT
       - COUNT
    - Support `BIG_DECIMAL` in minimum set of transforms to unblock ingestion and testing:
        - JsonExtractScalar
        - DefaultJsonPathEvaluator
        - LiteralTransform
        - IdentifierTransform
        - PushDownTransform
        - ScalarTransformFunctionWrapper
        - SumPrecision
    - Fix bug in `transformToBytesValuesSV` method in BaseTransformFunction.
---
 .../org/apache/pinot/client/utils/DriverUtils.java |   7 +
 .../pinot/common/function/FunctionUtils.java       |   5 +
 .../org/apache/pinot/common/utils/DataSchema.java  |  14 +-
 .../org/apache/pinot/common/utils/DataTable.java   |   3 +
 .../apache/pinot/common/utils/PinotDataType.java   | 149 ++++++++++-
 .../apache/pinot/common/data/FieldSpecTest.java    |  21 ++
 .../org/apache/pinot/common/data/SchemaTest.java   |  22 +-
 .../pinot/common/utils/PinotDataTypeTest.java      |  33 ++-
 .../org/apache/pinot/core/common/BlockValSet.java  |  10 +-
 .../apache/pinot/core/common/DataBlockCache.java   |  30 +++
 .../org/apache/pinot/core/common/DataFetcher.java  |  45 ++++
 .../core/common/RowBasedBlockValueFetcher.java     |  15 ++
 .../pinot/core/common/datatable/BaseDataTable.java |  10 +
 .../core/common/datatable/DataTableBuilder.java    |  11 +
 .../evaluators/DefaultJsonPathEvaluator.java       |  77 ++++++
 .../operator/blocks/IntermediateResultsBlock.java  |   4 +
 .../core/operator/blocks/ProjectionBlock.java      |  12 +
 .../operator/docvalsets/ProjectionBlockValSet.java |   9 +
 .../operator/docvalsets/TransformBlockValSet.java  |   9 +
 .../operator/query/SelectionOrderByOperator.java   |   4 +
 .../transform/function/BaseTransformFunction.java  |  70 +++++-
 .../function/IdentifierTransformFunction.java      |  13 +
 .../JsonExtractScalarTransformFunction.java        |  62 ++++-
 .../function/LiteralTransformFunction.java         |  40 ++-
 .../function/PushDownTransformFunction.java        |  11 +
 .../function/ScalarTransformFunctionWrapper.java   |   3 +
 .../transform/function/TransformFunction.java      |   9 +
 .../function/SumPrecisionAggregationFunction.java  |  26 ++
 .../pinot/core/query/distinct/DistinctTable.java   |   7 +
 .../core/query/reduce/GroupByDataTableReducer.java |   3 +
 .../core/query/reduce/RowBasedBlockValSet.java     |  14 ++
 .../query/selection/SelectionOperatorService.java  |   8 +-
 .../query/selection/SelectionOperatorUtils.java    |   9 +
 .../apache/pinot/core/common/DataFetcherTest.java  |  53 +++-
 .../function/BaseTransformFunctionTest.java        |  43 ++++
 .../JsonExtractScalarTransformFunctionTest.java    |  39 ++-
 .../function/LiteralTransformFunctionTest.java     |   1 +
 .../ScalarTransformFunctionWrapperTest.java        |  15 ++
 .../pinot/queries/BigDecimalQueriesTest.java       | 272 +++++++++++++++++++++
 .../pinot/segment/local/io/util/ValueReader.java   |   6 +
 .../dictionary/BytesOffHeapMutableDictionary.java  |   7 +
 .../dictionary/BytesOnHeapMutableDictionary.java   |   7 +
 .../dictionary/DoubleOffHeapMutableDictionary.java |   6 +
 .../dictionary/DoubleOnHeapMutableDictionary.java  |   6 +
 .../dictionary/FloatOffHeapMutableDictionary.java  |   6 +
 .../dictionary/FloatOnHeapMutableDictionary.java   |   6 +
 .../dictionary/IntOffHeapMutableDictionary.java    |   6 +
 .../dictionary/IntOnHeapMutableDictionary.java     |   6 +
 .../dictionary/LongOffHeapMutableDictionary.java   |   6 +
 .../dictionary/LongOnHeapMutableDictionary.java    |   6 +
 .../dictionary/StringOffHeapMutableDictionary.java |   6 +
 .../dictionary/StringOnHeapMutableDictionary.java  |   6 +
 .../creator/impl/SegmentDictionaryCreator.java     |  50 ++++
 .../impl/SegmentIndexCreationDriverImpl.java       |   6 +-
 .../BigDecimalColumnPreIndexStatsCollector.java    | 125 ++++++++++
 .../stats/SegmentPreIndexStatsCollectorImpl.java   |   4 +
 .../index/column/PhysicalColumnIndexContainer.java |   7 +-
 .../index/readers/BaseImmutableDictionary.java     |  23 ++
 ...esDictionary.java => BigDecimalDictionary.java} |  47 ++--
 .../segment/index/readers/BytesDictionary.java     |   7 +
 .../readers/ConstantValueBytesDictionary.java      |   7 +
 .../readers/ConstantValueDoubleDictionary.java     |   6 +
 .../readers/ConstantValueFloatDictionary.java      |   6 +
 .../index/readers/ConstantValueIntDictionary.java  |   6 +
 .../index/readers/ConstantValueLongDictionary.java |   6 +
 .../readers/ConstantValueStringDictionary.java     |   6 +
 .../segment/index/readers/DocIdDictionary.java     |   6 +
 .../segment/index/readers/DoubleDictionary.java    |   6 +
 .../segment/index/readers/FloatDictionary.java     |   6 +
 .../local/segment/index/readers/IntDictionary.java |   6 +
 .../segment/index/readers/LongDictionary.java      |   6 +
 .../index/readers/OnHeapDoubleDictionary.java      |   6 +
 .../index/readers/OnHeapFloatDictionary.java       |   6 +
 .../segment/index/readers/OnHeapIntDictionary.java |   6 +
 .../index/readers/OnHeapLongDictionary.java        |   6 +
 .../index/readers/OnHeapStringDictionary.java      |   6 +
 .../segment/index/readers/StringDictionary.java    |   6 +
 .../local/utils/FixedIntArrayOffHeapIdMap.java     |   6 +
 .../local/segment/creator/DictionariesTest.java    |  55 +++++
 .../index/readers/ImmutableDictionaryTest.java     |  55 +++++
 .../ImmutableDictionaryTypeConversionTest.java     |  44 +++-
 .../segment/spi/evaluator/TransformEvaluator.java  |  13 +
 .../spi/index/metadata/ColumnMetadataImpl.java     |   5 +
 .../pinot/segment/spi/index/reader/Dictionary.java |   9 +
 .../spi/index/reader/ForwardIndexReader.java       |  48 ++++
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  13 +
 .../java/org/apache/pinot/spi/data/Schema.java     |   2 +
 .../org/apache/pinot/spi/utils/ArrayCopyUtils.java |  68 ++++++
 .../apache/pinot/spi/utils/BigDecimalUtils.java    |  17 ++
 .../java/org/apache/pinot/spi/utils/JsonUtils.java |   2 +
 90 files changed, 1908 insertions(+), 69 deletions(-)

diff --git a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DriverUtils.java b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DriverUtils.java
index ec1119e7c7..3f68460aaa 100644
--- a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DriverUtils.java
+++ b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DriverUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.client.utils;
 
+import java.math.BigDecimal;
 import java.net.URI;
 import java.sql.Timestamp;
 import java.sql.Types;
@@ -97,6 +98,9 @@ public class DriverUtils {
       case "DOUBLE":
         columnsSQLDataType = Types.DOUBLE;
         break;
+      case "BIG_DECIMAL":
+        columnsSQLDataType = Types.DECIMAL;
+        break;
       case "BOOLEAN":
         columnsSQLDataType = Types.BOOLEAN;
         break;
@@ -134,6 +138,9 @@ public class DriverUtils {
       case "DOUBLE":
         columnsJavaClassName = Double.class.getTypeName();
         break;
+      case "BIG_DECIMAL":
+        columnsJavaClassName = BigDecimal.class.getTypeName();
+        break;
       case "BOOLEAN":
         columnsJavaClassName = Boolean.class.getTypeName();
         break;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
index de2d0d4a2e..3c19473df5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.function;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,6 +42,7 @@ public class FunctionUtils {
     put(Float.class, PinotDataType.FLOAT);
     put(double.class, PinotDataType.DOUBLE);
     put(Double.class, PinotDataType.DOUBLE);
+    put(BigDecimal.class, PinotDataType.BIG_DECIMAL);
     put(boolean.class, PinotDataType.BOOLEAN);
     put(Boolean.class, PinotDataType.BOOLEAN);
     put(Timestamp.class, PinotDataType.TIMESTAMP);
@@ -64,6 +66,7 @@ public class FunctionUtils {
     put(Long.class, PinotDataType.LONG);
     put(Float.class, PinotDataType.FLOAT);
     put(Double.class, PinotDataType.DOUBLE);
+    put(BigDecimal.class, PinotDataType.BIG_DECIMAL);
     put(Timestamp.class, PinotDataType.TIMESTAMP);
     put(String.class, PinotDataType.STRING);
     put(byte[].class, PinotDataType.BYTES);
@@ -88,6 +91,7 @@ public class FunctionUtils {
     put(Float.class, DataType.FLOAT);
     put(double.class, DataType.DOUBLE);
     put(Double.class, DataType.DOUBLE);
+    put(BigDecimal.class, DataType.BIG_DECIMAL);
     put(boolean.class, DataType.BOOLEAN);
     put(Boolean.class, DataType.BOOLEAN);
     put(Timestamp.class, DataType.TIMESTAMP);
@@ -109,6 +113,7 @@ public class FunctionUtils {
     put(Float.class, ColumnDataType.FLOAT);
     put(double.class, ColumnDataType.DOUBLE);
     put(Double.class, ColumnDataType.DOUBLE);
+    put(BigDecimal.class, ColumnDataType.BIG_DECIMAL);
     put(boolean.class, ColumnDataType.BOOLEAN);
     put(Boolean.class, ColumnDataType.BOOLEAN);
     put(Timestamp.class, ColumnDataType.TIMESTAMP);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 915a8110ef..c44a6a964a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.Arrays;
@@ -242,6 +243,7 @@ public class DataSchema {
     LONG,
     FLOAT,
     DOUBLE,
+    BIG_DECIMAL,
     BOOLEAN /* Stored as INT */,
     TIMESTAMP /* Stored as LONG */,
     STRING,
@@ -257,7 +259,7 @@ public class DataSchema {
     BYTES_ARRAY,
     STRING_ARRAY;
 
-    private static final EnumSet<ColumnDataType> NUMERIC_TYPES = EnumSet.of(INT, LONG, FLOAT, DOUBLE);
+    private static final EnumSet<ColumnDataType> NUMERIC_TYPES = EnumSet.of(INT, LONG, FLOAT, DOUBLE, BIG_DECIMAL);
     private static final EnumSet<ColumnDataType> INTEGRAL_TYPES = EnumSet.of(INT, LONG);
     private static final EnumSet<ColumnDataType> ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY,
         DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY);
@@ -316,6 +318,8 @@ public class DataSchema {
           return DataType.FLOAT;
         case DOUBLE:
           return DataType.DOUBLE;
+        case BIG_DECIMAL:
+          return DataType.BIG_DECIMAL;
         case BOOLEAN:
           return DataType.BOOLEAN;
         case TIMESTAMP:
@@ -345,6 +349,8 @@ public class DataSchema {
           return ((Number) value).floatValue();
         case DOUBLE:
           return ((Number) value).doubleValue();
+        case BIG_DECIMAL:
+          return (BigDecimal) value;
         case BOOLEAN:
           return (Integer) value == 1;
         case TIMESTAMP:
@@ -380,6 +386,8 @@ public class DataSchema {
      */
     public Serializable format(Object value) {
       switch (this) {
+        case BIG_DECIMAL:
+          return ((BigDecimal) value).toPlainString();
         case TIMESTAMP:
           assert value instanceof Timestamp;
           return value.toString();
@@ -403,6 +411,8 @@ public class DataSchema {
           return ((Number) value).floatValue();
         case DOUBLE:
           return ((Number) value).doubleValue();
+        case BIG_DECIMAL:
+          return (BigDecimal) value;
         case BOOLEAN:
           return (Integer) value == 1;
         case TIMESTAMP:
@@ -514,6 +524,8 @@ public class DataSchema {
           return FLOAT;
         case DOUBLE:
           return DOUBLE;
+        case BIG_DECIMAL:
+          return BIG_DECIMAL;
         case BOOLEAN:
           return BOOLEAN;
         case TIMESTAMP:
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index f73ab72c2f..9a78df64db 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.utils;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,6 +58,8 @@ public interface DataTable {
 
   double getDouble(int rowId, int colId);
 
+  BigDecimal getBigDecimal(int rowId, int colId);
+
   String getString(int rowId, int colId);
 
   ByteArray getBytes(int rowId, int colId);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index bd7503a2b6..9006ae0489 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -19,12 +19,14 @@
 package org.apache.pinot.common.utils;
 
 import com.fasterxml.jackson.core.JsonParseException;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.Base64;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -84,6 +86,11 @@ public enum PinotDataType {
       return ((Boolean) value) ? 1d : 0d;
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return ((Boolean) value) ? BigDecimal.ONE : BigDecimal.ZERO;
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Boolean) value;
@@ -136,6 +143,11 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf(toInt(value));
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Byte) value != 0;
@@ -178,6 +190,11 @@ public enum PinotDataType {
       return (double) ((Character) value);
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf(toInt(value));
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Character) value != 0;
@@ -220,6 +237,11 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf(toInt(value));
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Short) value != 0;
@@ -262,6 +284,11 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf((Integer) value);
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Integer) value != 0;
@@ -309,6 +336,14 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      // BigDecimal.valueOf(long) translates a long value into a BigDecimal value with a scale of zero.
+      // This "static factory method" is provided in preference to a (long) constructor because it allows for reuse of
+      // frequently used BigDecimal values.
+      return BigDecimal.valueOf((Long) value);
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Long) value != 0;
@@ -356,6 +391,11 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf((Float) value);
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Float) value != 0;
@@ -403,6 +443,15 @@ public enum PinotDataType {
       return (Double) value;
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      // Note:
+      // - BigDecimal.valueOf(double): uses the canonical String representation of the double value passed
+      //     in to instantiate the BigDecimal object.
+      // - new BigDecimal(double): attempts to represent the double value as accurately as possible.
+      return BigDecimal.valueOf((Double) value);
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return (Double) value != 0;
@@ -429,6 +478,58 @@ public enum PinotDataType {
     }
   },
 
+  BIG_DECIMAL {
+    @Override
+    public int toInt(Object value) {
+      return ((Number) value).intValue();
+    }
+
+    @Override
+    public long toLong(Object value) {
+      return ((Number) value).longValue();
+    }
+
+    @Override
+    public float toFloat(Object value) {
+      return ((Number) value).floatValue();
+    }
+
+    @Override
+    public double toDouble(Object value) {
+      return ((Number) value).doubleValue();
+    }
+
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return (BigDecimal) value;
+    }
+
+    @Override
+    public boolean toBoolean(Object value) {
+      return !value.equals(BigDecimal.ZERO);
+    }
+
+    @Override
+    public Timestamp toTimestamp(Object value) {
+      return new Timestamp(((Number) value).longValue());
+    }
+
+    @Override
+    public String toString(Object value) {
+      return ((BigDecimal) value).toPlainString();
+    }
+
+    @Override
+    public byte[] toBytes(Object value) {
+      return BigDecimalUtils.serialize((BigDecimal) value);
+    }
+
+    @Override
+    public BigDecimal convert(Object value, PinotDataType sourceType) {
+      return sourceType.toBigDecimal(value);
+    }
+  },
+
   /**
    * When converting from TIMESTAMP to other types:
    * - LONG/DOUBLE: millis since epoch value
@@ -461,6 +562,11 @@ public enum PinotDataType {
       return ((Timestamp) value).getTime();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf(toLong(value));
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from TIMESTAMP to BOOLEAN");
@@ -515,6 +621,11 @@ public enum PinotDataType {
       return Double.parseDouble(value.toString());
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return new BigDecimal(value.toString().trim());
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return BooleanUtils.toBoolean(value.toString().trim());
@@ -562,6 +673,11 @@ public enum PinotDataType {
       return Double.parseDouble(value.toString());
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return new BigDecimal(value.toString().trim());
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return Boolean.parseBoolean(value.toString().trim());
@@ -616,6 +732,11 @@ public enum PinotDataType {
       throw new UnsupportedOperationException("Cannot convert value from BYTES to DOUBLE");
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimalUtils.deserialize((byte[]) value);
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from BYTES to BOOLEAN");
@@ -663,6 +784,11 @@ public enum PinotDataType {
       return ((Number) value).doubleValue();
     }
 
+    @Override
+    public BigDecimal toBigDecimal(Object value) {
+      return BigDecimal.valueOf(((Number) value).doubleValue());
+    }
+
     @Override
     public boolean toBoolean(Object value) {
       return ((Number) value).doubleValue() != 0;
@@ -820,6 +946,10 @@ public enum PinotDataType {
     return getSingleValueType().toDouble(toObjectArray(value)[0]);
   }
 
+  public BigDecimal toBigDecimal(Object value) {
+    return getSingleValueType().toBigDecimal(toObjectArray(value)[0]);
+  }
+
   public boolean toBoolean(Object value) {
     return getSingleValueType().toBoolean(((Object[]) value)[0]);
   }
@@ -1118,12 +1248,12 @@ public enum PinotDataType {
     } else {
       Object[] valueArray = toObjectArray(value);
       int length = valueArray.length;
-      Timestamp[] booleanArray = new Timestamp[length];
+      Timestamp[] timestampArray = new Timestamp[length];
       PinotDataType singleValueType = getSingleValueType();
       for (int i = 0; i < length; i++) {
-        booleanArray[i] = singleValueType.toTimestamp(valueArray[i]);
+        timestampArray[i] = singleValueType.toTimestamp(valueArray[i]);
       }
-      return booleanArray;
+      return timestampArray;
     }
   }
 
@@ -1194,6 +1324,9 @@ public enum PinotDataType {
     if (cls == Double.class) {
       return DOUBLE;
     }
+    if (cls == BigDecimal.class) {
+      return BIG_DECIMAL;
+    }
     if (cls == String.class) {
       return STRING;
     }
@@ -1286,6 +1419,11 @@ public enum PinotDataType {
         return fieldSpec.isSingleValueField() ? FLOAT : FLOAT_ARRAY;
       case DOUBLE:
         return fieldSpec.isSingleValueField() ? DOUBLE : DOUBLE_ARRAY;
+      case BIG_DECIMAL:
+        if (fieldSpec.isSingleValueField()) {
+          return BIG_DECIMAL;
+        }
+        throw new IllegalStateException("There is no multi-value type for BigDecimal");
       case BOOLEAN:
         return fieldSpec.isSingleValueField() ? BOOLEAN : BOOLEAN_ARRAY;
       case TIMESTAMP:
@@ -1293,9 +1431,8 @@ public enum PinotDataType {
       case JSON:
         if (fieldSpec.isSingleValueField()) {
           return JSON;
-        } else {
-          throw new IllegalStateException("There is no multi-value type for JSON");
         }
+        throw new IllegalStateException("There is no multi-value type for JSON");
       case STRING:
         return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY;
       case BYTES:
@@ -1320,6 +1457,8 @@ public enum PinotDataType {
         return FLOAT;
       case DOUBLE:
         return DOUBLE;
+      case BIG_DECIMAL:
+        return BIG_DECIMAL;
       case BOOLEAN:
         return BOOLEAN;
       case TIMESTAMP:
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
index 6eb3d31d38..1d2c92a8a8 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.data;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -55,6 +56,7 @@ public class FieldSpecTest {
     Assert.assertEquals(LONG.getStoredType(), LONG);
     Assert.assertEquals(FLOAT.getStoredType(), FLOAT);
     Assert.assertEquals(DOUBLE.getStoredType(), DOUBLE);
+    Assert.assertEquals(BIG_DECIMAL.getStoredType(), BIG_DECIMAL);
     Assert.assertEquals(BOOLEAN.getStoredType(), INT);
     Assert.assertEquals(TIMESTAMP.getStoredType(), LONG);
     Assert.assertEquals(STRING.getStoredType(), STRING);
@@ -164,6 +166,17 @@ public class FieldSpecTest {
     Assert.assertEquals(fieldSpec1.hashCode(), fieldSpec2.hashCode());
     Assert.assertEquals(fieldSpec1.getDefaultNullValue(), 1L);
 
+    // Single-value BigDecimal type dimension field with default null value.
+    fieldSpec1 = new MetricFieldSpec();
+    fieldSpec1.setName("svMetric");
+    fieldSpec1.setDataType(BIG_DECIMAL);
+    fieldSpec1.setDefaultNullValue(BigDecimal.ZERO);
+    fieldSpec2 = new MetricFieldSpec("svMetric", BIG_DECIMAL, BigDecimal.ZERO);
+    Assert.assertEquals(fieldSpec1, fieldSpec2);
+    Assert.assertEquals(fieldSpec1.toString(), fieldSpec2.toString());
+    Assert.assertEquals(fieldSpec1.hashCode(), fieldSpec2.hashCode());
+    Assert.assertEquals(fieldSpec1.getDefaultNullValue(), BigDecimal.ZERO);
+
     // Metric field with default null value for byte column.
     fieldSpec1 = new MetricFieldSpec();
     fieldSpec1.setName("byteMetric");
@@ -356,6 +369,14 @@ public class FieldSpecTest {
     first = JsonUtils.stringToObject(getRandomOrderJsonString(dateTimeFields), DateTimeFieldSpec.class);
     second = JsonUtils.stringToObject(first.toJsonObject().toString(), DateTimeFieldSpec.class);
     Assert.assertEquals(first, second, ERROR_MESSAGE);
+
+    // BigDecimal field
+    String[] metricFields = new String[]{
+        "\"name\":\"Salary\"", "\"dataType\":\"BIG_DECIMAL\""
+    };
+    first = JsonUtils.stringToObject(getRandomOrderJsonString(metricFields), MetricFieldSpec.class);
+    second = JsonUtils.stringToObject(first.toJsonObject().toString(), MetricFieldSpec.class);
+    Assert.assertEquals(first, second, ERROR_MESSAGE);
   }
 
   /**
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index 24f65ca9ca..a53228e841 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.data;
 
 import java.io.File;
+import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Timestamp;
 import java.util.concurrent.TimeUnit;
@@ -83,13 +84,21 @@ public class SchemaTest {
     schemaToValidate.validate();
 
     schemaToValidate = new Schema();
-    schemaToValidate.addField(new MetricFieldSpec("m", FieldSpec.DataType.BOOLEAN, new Timestamp(0)));
+    schemaToValidate.addField(new MetricFieldSpec("m", FieldSpec.DataType.TIMESTAMP, new Timestamp(0)));
     try {
       schemaToValidate.validate();
       Assert.fail("Should have failed validation for invalid schema.");
     } catch (IllegalStateException e) {
       // expected
     }
+
+    schemaToValidate = new Schema();
+    schemaToValidate.addField(new MetricFieldSpec("d", FieldSpec.DataType.BIG_DECIMAL));
+    schemaToValidate.validate();
+
+    schemaToValidate = new Schema();
+    schemaToValidate.addField(new MetricFieldSpec("m", FieldSpec.DataType.BIG_DECIMAL, BigDecimal.ZERO));
+    schemaToValidate.validate();
   }
 
   @Test
@@ -97,6 +106,7 @@ public class SchemaTest {
     String defaultString = "default";
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svDimension", FieldSpec.DataType.INT)
         .addSingleValueDimension("svDimensionWithDefault", FieldSpec.DataType.INT, 10)
+        .addMetric("svBigDecimalMetricWithDefault", FieldSpec.DataType.BIG_DECIMAL, BigDecimal.TEN)
         .addSingleValueDimension("svDimensionWithMaxLength", FieldSpec.DataType.STRING, 20000, null)
         .addMultiValueDimension("mvDimension", FieldSpec.DataType.STRING)
         .addMultiValueDimension("mvDimensionWithDefault", FieldSpec.DataType.STRING, defaultString)
@@ -122,6 +132,14 @@ public class SchemaTest {
     Assert.assertTrue(dimensionFieldSpec.isSingleValueField());
     Assert.assertEquals(dimensionFieldSpec.getDefaultNullValue(), 10);
 
+    MetricFieldSpec metricFieldSpec = schema.getMetricSpec("svBigDecimalMetricWithDefault");
+    Assert.assertNotNull(metricFieldSpec);
+    Assert.assertEquals(metricFieldSpec.getFieldType(), FieldSpec.FieldType.METRIC);
+    Assert.assertEquals(metricFieldSpec.getName(), "svBigDecimalMetricWithDefault");
+    Assert.assertEquals(metricFieldSpec.getDataType(), FieldSpec.DataType.BIG_DECIMAL);
+    Assert.assertTrue(metricFieldSpec.isSingleValueField());
+    Assert.assertEquals(metricFieldSpec.getDefaultNullValue(), BigDecimal.TEN);
+
     dimensionFieldSpec = schema.getDimensionSpec("svDimensionWithMaxLength");
     Assert.assertNotNull(dimensionFieldSpec);
     Assert.assertEquals(dimensionFieldSpec.getFieldType(), FieldSpec.FieldType.DIMENSION);
@@ -156,7 +174,7 @@ public class SchemaTest {
     Assert.assertEquals(dimensionFieldSpec.getMaxLength(), 20000);
     Assert.assertEquals(dimensionFieldSpec.getDefaultNullValue(), "null");
 
-    MetricFieldSpec metricFieldSpec = schema.getMetricSpec("metric");
+    metricFieldSpec = schema.getMetricSpec("metric");
     Assert.assertNotNull(metricFieldSpec);
     Assert.assertEquals(metricFieldSpec.getFieldType(), FieldSpec.FieldType.METRIC);
     Assert.assertEquals(metricFieldSpec.getName(), "metric");
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 002d1dcdc0..d7986a3caf 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,21 +36,23 @@ import static org.testng.Assert.fail;
 
 public class PinotDataTypeTest {
   private static final PinotDataType[] SOURCE_TYPES = {
-      BYTE, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, JSON, BYTE_ARRAY, CHARACTER_ARRAY, SHORT_ARRAY,
-      INTEGER_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY
+      BYTE, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, BIG_DECIMAL, STRING, JSON,
+      BYTE_ARRAY, CHARACTER_ARRAY, SHORT_ARRAY, INTEGER_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY
   };
   private static final Object[] SOURCE_VALUES = {
-      (byte) 123, (char) 123, (short) 123, 123, 123L, 123f, 123d, " 123", "123 ", new Object[]{(byte) 123},
-      new Object[]{(char) 123}, new Object[]{(short) 123}, new Object[]{123}, new Object[]{123L}, new Object[]{123f},
-      new Object[]{123d}, new Object[]{" 123"}
+      (byte) 123, (char) 123, (short) 123, 123, 123L, 123f, 123d, BigDecimal.valueOf(123), " 123", "123 ",
+      new Object[]{(byte) 123}, new Object[]{(char) 123}, new Object[]{(short) 123}, new Object[]{123},
+      new Object[]{123L}, new Object[]{123f}, new Object[]{123d}, new Object[]{" 123"}
   };
   private static final PinotDataType[] DEST_TYPES =
-      {INTEGER, LONG, FLOAT, DOUBLE, INTEGER_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY};
+      {INTEGER, LONG, FLOAT, DOUBLE, BIG_DECIMAL, INTEGER_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY};
   private static final Object[] EXPECTED_DEST_VALUES =
-      {123, 123L, 123f, 123d, new Object[]{123}, new Object[]{123L}, new Object[]{123f}, new Object[]{123d}};
+      {123, 123L, 123f, 123d, BigDecimal.valueOf(123), new Object[]{123}, new Object[]{123L}, new Object[]{123f},
+          new Object[]{123d}};
   private static final String[] EXPECTED_STRING_VALUES = {
       Byte.toString((byte) 123), Character.toString((char) 123), Short.toString((short) 123), Integer.toString(123),
-      Long.toString(123L), Float.toString(123f), Double.toString(123d), " 123", "123 ", Byte.toString((byte) 123),
+      Long.toString(123L), Float.toString(123f), Double.toString(123d),
+      (BigDecimal.valueOf(123)).toPlainString(), " 123", "123 ", Byte.toString((byte) 123),
       Character.toString((char) 123), Short.toString((short) 123), Integer.toString(123), Long.toString(123L),
       Float.toString(123f), Double.toString(123d), " 123"
   };
@@ -83,7 +86,15 @@ public class PinotDataTypeTest {
       int numSourceTypes = SOURCE_TYPES.length;
       for (int j = 0; j < numSourceTypes; j++) {
         Object actualDestValue = destType.convert(SOURCE_VALUES[j], SOURCE_TYPES[j]);
-        assertEquals(actualDestValue, expectedDestValue);
+        if (expectedDestValue.getClass().equals(BigDecimal.class)
+            || actualDestValue.getClass().equals(BigDecimal.class)) {
+          // Note: Unlike compareTo() method, BigDecimal equals() method considers two BigDecimal objects equal only
+          // if they are equal in value and scale, (thus 123.0 is not equal to 123 when compared by this method).
+          assertTrue(actualDestValue.equals(expectedDestValue)
+              || ((Comparable) expectedDestValue).compareTo(actualDestValue) == 0);
+        } else {
+          assertEquals(actualDestValue, expectedDestValue);
+        }
       }
     }
   }
@@ -241,6 +252,7 @@ public class PinotDataTypeTest {
     testCases.put(Long.class, LONG);
     testCases.put(Float.class, FLOAT);
     testCases.put(Double.class, DOUBLE);
+    testCases.put(BigDecimal.class, BIG_DECIMAL);
     testCases.put(Timestamp.class, TIMESTAMP);
     testCases.put(String.class, STRING);
     testCases.put(byte[].class, BYTES);
@@ -291,7 +303,8 @@ public class PinotDataTypeTest {
   @Test
   public void testInvalidConversion() {
     for (PinotDataType sourceType : values()) {
-      if (sourceType.isSingleValue() && sourceType != STRING && sourceType != BYTES && sourceType != JSON) {
+      if (sourceType.isSingleValue() && sourceType != STRING && sourceType != BYTES && sourceType != JSON
+          && sourceType != BIG_DECIMAL) {
         assertInvalidConversion(null, sourceType, BYTES, UnsupportedOperationException.class);
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
index 992b307e61..893be5acca 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.common;
 
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -84,6 +85,13 @@ public interface BlockValSet {
    */
   double[] getDoubleValuesSV();
 
+  /**
+   * Returns the BigDecimal values for a single-valued column.
+   *
+   * @return Array of BigDecimal values
+   */
+  BigDecimal[] getBigDecimalValuesSV();
+
   /**
    * Returns the string values for a single-valued column.
    *
@@ -94,7 +102,7 @@ public interface BlockValSet {
   /**
    * Returns the byte[] values for a single-valued column.
    *
-   * @return Array of string values
+   * @return Array of byte[] values
    */
   byte[][] getBytesValuesSV();
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index 2b0f66d7e8..1d341dc896 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.common;
 
+import java.math.BigDecimal;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -231,6 +232,35 @@ public class DataBlockCache {
     _dataFetcher.fetchDoubleValues(column, evaluator, _docIds, _length, buffer);
   }
 
+  /**
+   * Get the BigDecimal values for a single-valued column.
+   *
+   * @param column Column name
+   * @return Array of BigDecimal values
+   */
+  public BigDecimal[] getBigDecimalValuesForSVColumn(String column) {
+    BigDecimal[] bigDecimalValues = getValues(FieldSpec.DataType.BIG_DECIMAL, column);
+    if (markLoaded(FieldSpec.DataType.BIG_DECIMAL, column)) {
+      if (bigDecimalValues == null) {
+        bigDecimalValues = new BigDecimal[_length];
+        putValues(FieldSpec.DataType.BIG_DECIMAL, column, bigDecimalValues);
+      }
+      _dataFetcher.fetchBigDecimalValues(column, _docIds, _length, bigDecimalValues);
+    }
+    return bigDecimalValues;
+  }
+
+  /**
+   * Get the BigDecimal values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, BigDecimal[] buffer) {
+    _dataFetcher.fetchBigDecimalValues(column, evaluator, _docIds, _length, buffer);
+  }
+
   /**
    * Get the string values for a single-valued column.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index e366edc5a6..11cae5496a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.common;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -194,6 +195,32 @@ public class DataFetcher {
     _columnValueReaderMap.get(column).readDoubleValues(evaluator, inDocIds, length, outValues);
   }
 
+  /**
+   * Fetch the BigDecimal values for a single-valued column.
+   *
+   * @param column Column name
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchBigDecimalValues(String column, int[] inDocIds, int length, BigDecimal[] outValues) {
+    _columnValueReaderMap.get(column).readBigDecimalValues(inDocIds, length, outValues);
+  }
+
+  /**
+   * Fetch and transform BigDecimal values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchBigDecimalValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      BigDecimal[] outValues) {
+    _columnValueReaderMap.get(column).readBigDecimalValues(evaluator, inDocIds, length, outValues);
+  }
+
   /**
    * Fetch the string values for a single-valued column.
    *
@@ -497,6 +524,24 @@ public class DataFetcher {
           valueBuffer);
     }
 
+    void readBigDecimalValues(int[] docIds, int length, BigDecimal[] valueBuffer) {
+      Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
+      ForwardIndexReaderContext readerContext = getReaderContext();
+      if (_dictionary != null) {
+        int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
+        _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
+        _dictionary.readBigDecimalValues(dictIdBuffer, length, valueBuffer);
+      } else {
+        _reader.readValuesSV(docIds, length, valueBuffer, readerContext);
+      }
+    }
+
+    void readBigDecimalValues(TransformEvaluator evaluator, int[] docIds, int length, BigDecimal[] valueBuffer) {
+      Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readStringValues(int[] docIds, int length, String[] valueBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
       ForwardIndexReaderContext readerContext = getReaderContext();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
index d678f9f5c9..64a3f6a788 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.common;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
 
@@ -60,6 +61,8 @@ public class RowBasedBlockValueFetcher {
           return new FloatSingleValueFetcher(blockValSet.getFloatValuesSV());
         case DOUBLE:
           return new DoubleSingleValueFetcher(blockValSet.getDoubleValuesSV());
+        case BIG_DECIMAL:
+          return new BigDecimalValueFetcher(blockValSet.getBigDecimalValuesSV());
         case STRING:
           return new StringSingleValueFetcher(blockValSet.getStringValuesSV());
         case BYTES:
@@ -137,6 +140,18 @@ public class RowBasedBlockValueFetcher {
     }
   }
 
+  private static class BigDecimalValueFetcher implements ValueFetcher {
+    private final BigDecimal[] _values;
+
+    BigDecimalValueFetcher(BigDecimal[] values) {
+      _values = values;
+    }
+
+    public BigDecimal getValue(int docId) {
+      return _values[docId];
+    }
+  }
+
   private static class StringSingleValueFetcher implements ValueFetcher {
     private final String[] _values;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 24d113bd7d..66f5086a01 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.common.datatable;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -172,6 +174,14 @@ public abstract class BaseDataTable implements DataTable {
     return _fixedSizeData.getDouble();
   }
 
+  @Override
+  public BigDecimal getBigDecimal(int rowId, int colId) {
+    int size = positionCursorInVariableBuffer(rowId, colId);
+    ByteBuffer byteBuffer = _variableSizeData.slice();
+    byteBuffer.limit(size);
+    return BigDecimalUtils.deserialize(byteBuffer);
+  }
+
   @Override
   public String getString(int rowId, int colId) {
     _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index e0e6c4edd8..a0b046a813 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.common.datatable;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 
 
@@ -159,6 +161,15 @@ public class DataTableBuilder {
     _currentRowDataByteBuffer.putDouble(value);
   }
 
+  public void setColumn(int colId, BigDecimal value)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    byte[] bytes = BigDecimalUtils.serialize(value);
+    _currentRowDataByteBuffer.putInt(bytes.length);
+    _variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
   public void setColumn(int colId, String value) {
     String columnName = _dataSchema.getColumnName(colId);
     Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
index b45acec9f4..75f70d7230 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.common.evaluators;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.InvalidPathException;
 import com.jayway.jsonpath.JsonPath;
@@ -25,6 +27,7 @@ import com.jayway.jsonpath.Option;
 import com.jayway.jsonpath.ParseContext;
 import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import java.math.BigDecimal;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.function.JsonPathCache;
@@ -38,10 +41,17 @@ import org.apache.pinot.spi.utils.JsonUtils;
 
 public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
 
+  private static final ObjectMapper OBJECT_MAPPER_WITH_BIG_DECIMAL = new ObjectMapper()
+      .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
+
   private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
       new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider())
           .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
 
+  private static final ParseContext JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL = JsonPath.using(
+      new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider(OBJECT_MAPPER_WITH_BIG_DECIMAL))
+          .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
+
   private static final int[] EMPTY_INTS = new int[0];
   private static final long[] EMPTY_LONGS = new long[0];
   private static final float[] EMPTY_FLOATS = new float[0];
@@ -196,6 +206,41 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
     }
   }
 
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, BigDecimal[] valueBuffer) {
+    BigDecimal defaultValue = (_defaultValue instanceof BigDecimal) ? ((BigDecimal) _defaultValue) : BigDecimal.ZERO;
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytesWithExactBigDecimal(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromStringWithExactBigDecimal(dictionary, dictIdsBuffer[i]), defaultValue,
+              valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromStringWithExactBigDecimal(reader, context, docIds[i]), defaultValue,
+                valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytesWithExactBigDecimal(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
   public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
       ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, String[] valueBuffer) {
     if (reader.isDictionaryEncoded()) {
@@ -397,6 +442,15 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
     return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath);
   }
 
+  private <T> T extractFromBytesWithExactBigDecimal(Dictionary dictionary, int dictId) {
+    return JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath);
+  }
+
+  private <R extends ForwardIndexReaderContext> BigDecimal extractFromBytesWithExactBigDecimal(
+      ForwardIndexReader<R> reader, R context, int docId) {
+    return JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath);
+  }
+
   private <T> T extractFromString(Dictionary dictionary, int dictId) {
     return JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath);
   }
@@ -406,6 +460,15 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
     return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath);
   }
 
+  private <T> T extractFromStringWithExactBigDecimal(Dictionary dictionary, int dictId) {
+    return JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(dictionary.getStringValue(dictId)).read(_jsonPath);
+  }
+
+  private <R extends ForwardIndexReaderContext> BigDecimal extractFromStringWithExactBigDecimal(
+      ForwardIndexReader<R> reader, R context, int docId) {
+    return JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath);
+  }
+
   private void processValue(int index, Object value, int defaultValue, int[] valueBuffer) {
     if (value instanceof Number) {
       valueBuffer[index] = ((Number) value).intValue();
@@ -463,6 +526,20 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
     }
   }
 
+  private void processValue(int index, Object value, BigDecimal defaultValue, BigDecimal[] valueBuffer) {
+    if (value instanceof BigDecimal) {
+      valueBuffer[index] = (BigDecimal) value;
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = defaultValue;
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      valueBuffer[index] = new BigDecimal(value.toString());
+    }
+  }
+
   private void processValue(int index, Object value, String[] valueBuffer) {
     if (value instanceof String) {
       valueBuffer[index] = (String) value;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index bdaa3bd444..b7bb826db5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.blocks;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -360,6 +361,9 @@ public class IntermediateResultsBlock implements Block {
       case DOUBLE:
         dataTableBuilder.setColumn(columnIndex, (double) value);
         break;
+      case BIG_DECIMAL:
+        dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
+        break;
       case STRING:
         dataTableBuilder.setColumn(columnIndex, (String) value);
         break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
index b13cfa8afd..066d27b350 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
+import java.math.BigDecimal;
 import java.util.Map;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
@@ -119,6 +120,17 @@ public class ProjectionBlock implements Block {
     _dataBlockCache.fillValues(column, evaluator, buffer);
   }
 
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a BigDecimal value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, BigDecimal[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
   /**
    * Pushes a {@see TransformEvaluator} which will produce a String value down
    * to be evaluated against the column. This is an unstable API.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
index 4963b779ad..29f3861320 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.docvalsets;
 
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.DataBlockCache;
@@ -107,6 +108,14 @@ public class ProjectionBlockValSet implements BlockValSet {
     }
   }
 
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
+      recordReadValues(scope, DataType.BIG_DECIMAL, true);
+      return _dataBlockCache.getBigDecimalValuesForSVColumn(_column);
+    }
+  }
+
   @Override
   public String[] getStringValuesSV() {
     try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
index 9b71541d2a..031a02e4a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.docvalsets;
 
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
@@ -103,6 +104,14 @@ public class TransformBlockValSet implements BlockValSet {
     }
   }
 
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) {
+      recordTransformValues(scope, DataType.BIG_DECIMAL, true);
+      return _transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    }
+  }
+
   @Override
   public String[] getStringValuesSV() {
     try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index b6eeb82fec..f01cd2bcd2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.query;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -161,6 +162,9 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
           case DOUBLE:
             result = ((Double) v1).compareTo((Double) v2);
             break;
+          case BIG_DECIMAL:
+            result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
+            break;
           case STRING:
             result = ((String) v1).compareTo((String) v2);
             break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
index 45d7f0ef06..55de43a258 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.transform.function;
 
 import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -38,6 +39,8 @@ public abstract class BaseTransformFunction implements TransformFunction {
       new TransformResultMetadata(DataType.FLOAT, true, false);
   protected static final TransformResultMetadata DOUBLE_SV_NO_DICTIONARY_METADATA =
       new TransformResultMetadata(DataType.DOUBLE, true, false);
+  protected static final TransformResultMetadata BIG_DECIMAL_SV_NO_DICTIONARY_METADATA =
+      new TransformResultMetadata(DataType.BIG_DECIMAL, true, false);
   protected static final TransformResultMetadata BOOLEAN_SV_NO_DICTIONARY_METADATA =
       new TransformResultMetadata(DataType.BOOLEAN, true, false);
   protected static final TransformResultMetadata TIMESTAMP_SV_NO_DICTIONARY_METADATA =
@@ -55,6 +58,7 @@ public abstract class BaseTransformFunction implements TransformFunction {
   protected long[] _longValuesSV;
   protected float[] _floatValuesSV;
   protected double[] _doubleValuesSV;
+  protected BigDecimal[] _bigDecimalValuesSV;
   protected String[] _stringValuesSV;
   protected byte[][] _byteValuesSV;
   protected int[][] _intValuesMV;
@@ -104,6 +108,10 @@ public abstract class BaseTransformFunction implements TransformFunction {
           double[] doubleValues = transformToDoubleValuesSV(projectionBlock);
           ArrayCopyUtils.copy(doubleValues, _intValuesSV, length);
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformToBigDecimalValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bigDecimalValues, _intValuesSV, length);
+          break;
         case STRING:
           String[] stringValues = transformToStringValuesSV(projectionBlock);
           ArrayCopyUtils.copy(stringValues, _intValuesSV, length);
@@ -141,6 +149,10 @@ public abstract class BaseTransformFunction implements TransformFunction {
           double[] doubleValues = transformToDoubleValuesSV(projectionBlock);
           ArrayCopyUtils.copy(doubleValues, _longValuesSV, length);
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformToBigDecimalValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bigDecimalValues, _longValuesSV, length);
+          break;
         case STRING:
           String[] stringValues = transformToStringValuesSV(projectionBlock);
           ArrayCopyUtils.copy(stringValues, _longValuesSV, length);
@@ -178,6 +190,10 @@ public abstract class BaseTransformFunction implements TransformFunction {
           double[] doubleValues = transformToDoubleValuesSV(projectionBlock);
           ArrayCopyUtils.copy(doubleValues, _floatValuesSV, length);
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformToBigDecimalValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bigDecimalValues, _floatValuesSV, length);
+          break;
         case STRING:
           String[] stringValues = transformToStringValuesSV(projectionBlock);
           ArrayCopyUtils.copy(stringValues, _floatValuesSV, length);
@@ -215,6 +231,10 @@ public abstract class BaseTransformFunction implements TransformFunction {
           float[] floatValues = transformToFloatValuesSV(projectionBlock);
           ArrayCopyUtils.copy(floatValues, _doubleValuesSV, length);
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformToBigDecimalValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bigDecimalValues, _doubleValuesSV, length);
+          break;
         case STRING:
           String[] stringValues = transformToStringValuesSV(projectionBlock);
           ArrayCopyUtils.copy(stringValues, _doubleValuesSV, length);
@@ -226,6 +246,50 @@ public abstract class BaseTransformFunction implements TransformFunction {
     return _doubleValuesSV;
   }
 
+  @Override
+  public BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock) {
+    int length = projectionBlock.getNumDocs();
+    if (_bigDecimalValuesSV == null || _bigDecimalValuesSV.length < length) {
+      _bigDecimalValuesSV = new BigDecimal[length];
+    }
+
+    Dictionary dictionary = getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = transformToDictIdsSV(projectionBlock);
+      dictionary.readBigDecimalValues(dictIds, length, _bigDecimalValuesSV);
+    } else {
+      switch (getResultMetadata().getDataType().getStoredType()) {
+        case INT:
+          int[] intValues = transformToIntValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(intValues, _bigDecimalValuesSV, length);
+          break;
+        case LONG:
+          long[] longValues = transformToLongValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(longValues, _bigDecimalValuesSV, length);
+          break;
+        case FLOAT:
+          float[] floatValues = transformToFloatValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(floatValues, _bigDecimalValuesSV, length);
+          break;
+        case DOUBLE:
+          double[] doubleValues = transformToDoubleValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(doubleValues, _bigDecimalValuesSV, length);
+          break;
+        case STRING:
+          String[] stringValues = transformToStringValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(stringValues, _bigDecimalValuesSV, length);
+          break;
+        case BYTES:
+          byte[][] bytesValues = transformToBytesValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bytesValues, _bigDecimalValuesSV, length);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+    return _bigDecimalValuesSV;
+  }
+
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
     int length = projectionBlock.getNumDocs();
@@ -256,6 +320,10 @@ public abstract class BaseTransformFunction implements TransformFunction {
           double[] doubleValues = transformToDoubleValuesSV(projectionBlock);
           ArrayCopyUtils.copy(doubleValues, _stringValuesSV, length);
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformToBigDecimalValuesSV(projectionBlock);
+          ArrayCopyUtils.copy(bigDecimalValues, _stringValuesSV, length);
+          break;
         case BYTES:
           byte[][] bytesValues = transformToBytesValuesSV(projectionBlock);
           ArrayCopyUtils.copy(bytesValues, _stringValuesSV, length);
@@ -278,7 +346,7 @@ public abstract class BaseTransformFunction implements TransformFunction {
     Dictionary dictionary = getDictionary();
     if (dictionary != null) {
       int[] dictIds = transformToDictIdsSV(projectionBlock);
-      dictionary.readIntValues(dictIds, length, _intValuesSV);
+      dictionary.readBytesValues(dictIds, length, _byteValuesSV);
     } else {
       Preconditions.checkState(getResultMetadata().getDataType().getStoredType() == DataType.STRING);
       String[] stringValues = transformToStringValuesSV(projectionBlock);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
index 11db593b32..f6b8038858 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
@@ -99,6 +100,12 @@ public class IdentifierTransformFunction implements TransformFunction, PushDownT
     return projectionBlock.getBlockValueSet(_columnName).getDoubleValuesSV();
   }
 
+
+  @Override
+  public BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock) {
+    return projectionBlock.getBlockValueSet(_columnName).getBigDecimalValuesSV();
+  }
+
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
     return projectionBlock.getBlockValueSet(_columnName).getStringValuesSV();
@@ -155,6 +162,12 @@ public class IdentifierTransformFunction implements TransformFunction, PushDownT
     projectionBlock.fillValues(_columnName, evaluator, buffer);
   }
 
+  @Override
+  public void transformToBigDecimalValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      BigDecimal[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
   @Override
   public void transformToStringValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
       String[] buffer) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index 7bcff9b556..b30774a5f1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
 import com.jayway.jsonpath.ParseContext;
 import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.function.JsonPathCache;
@@ -46,13 +49,20 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * jsonExtractScalar(jsonFieldName, 'jsonPath', 'resultsType')
  * <code>jsonFieldName</code> is the Json String field/expression.
  * <code>jsonPath</code> is a JsonPath expression which used to read from JSON document
- * <code>results_type</code> refers to the results data type, could be INT, LONG, FLOAT, DOUBLE, STRING, INT_ARRAY,
- * LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY.
+ * <code>results_type</code> refers to the results data type, could be INT, LONG, FLOAT, DOUBLE, BIG_DECIMAL, STRING,
+ * INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY.
  *
  */
 public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "jsonExtractScalar";
 
+  private static final ObjectMapper OBJECT_MAPPER_WITH_BIG_DECIMAL = new ObjectMapper()
+      .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
+
+  private static final ParseContext JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL = JsonPath.using(
+      new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider(OBJECT_MAPPER_WITH_BIG_DECIMAL))
+          .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
+
   private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
       new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider())
           .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
@@ -99,8 +109,8 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
     } catch (Exception e) {
       throw new IllegalStateException(String.format(
           "Unsupported results type: %s for jsonExtractScalar function. Supported types are: "
-              + "INT/LONG/FLOAT/DOUBLE/BOOLEAN/TIMESTAMP/STRING/INT_ARRAY/LONG_ARRAY/FLOAT_ARRAY/DOUBLE_ARRAY"
-              + "/STRING_ARRAY", resultsType));
+              + "INT/LONG/FLOAT/DOUBLE/BOOLEAN/BIG_DECIMAL/TIMESTAMP/STRING/INT_ARRAY/LONG_ARRAY/FLOAT_ARRAY"
+              + "/DOUBLE_ARRAY/STRING_ARRAY", resultsType));
     }
   }
 
@@ -279,6 +289,48 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
     return _doubleValuesSV;
   }
 
+  @Override
+  public BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock) {
+    int numDocs = projectionBlock.getNumDocs();
+    if (_bigDecimalValuesSV == null || _bigDecimalValuesSV.length < numDocs) {
+      _bigDecimalValuesSV = new BigDecimal[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToBigDecimalValuesSV(projectionBlock, _jsonPathEvaluator, _bigDecimalValuesSV);
+      return _bigDecimalValuesSV;
+    }
+    return transformTransformedValuesToBigDecimalValuesSV(projectionBlock);
+  }
+
+  private BigDecimal[] transformTransformedValuesToBigDecimalValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        if (_defaultValue != null) {
+          _bigDecimalValuesSV[i] = (BigDecimal) _defaultValue;
+          continue;
+        }
+        throw new RuntimeException(
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
+      }
+      if (result instanceof Number) {
+        _bigDecimalValuesSV[i] = (BigDecimal) result;
+      } else {
+        _bigDecimalValuesSV[i] = new BigDecimal(result.toString());
+      }
+    }
+    return _bigDecimalValuesSV;
+  }
+
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
     int numDocs = projectionBlock.getNumDocs();
@@ -301,7 +353,7 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
     for (int i = 0; i < numDocs; i++) {
       Object result = null;
       try {
-        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+        result = JSON_PARSER_CONTEXT_WITH_BIG_DECIMAL.parse(jsonStrings[i]).read(_jsonPath);
       } catch (Exception ignored) {
       }
       if (result == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
index 4571552106..522b58e154 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
@@ -20,11 +20,13 @@ package org.apache.pinot.core.operator.transform.function;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -45,12 +47,14 @@ public class LiteralTransformFunction implements TransformFunction {
   private final long _longLiteral;
   private final float _floatLiteral;
   private final double _doubleLiteral;
+  private final BigDecimal _bigDecimalLiteral;
 
   // literals may be shared but values are intentionally not volatile as assignment races are benign
   private int[] _intResult;
   private long[] _longResult;
   private float[] _floatResult;
   private double[] _doubleResult;
+  private BigDecimal[] _bigDecimalResult;
   private String[] _stringResult;
   private byte[][] _bytesResult;
 
@@ -58,17 +62,21 @@ public class LiteralTransformFunction implements TransformFunction {
     _literal = literal;
     _dataType = inferLiteralDataType(literal);
     if (_dataType.isNumeric()) {
-      BigDecimal bigDecimal = new BigDecimal(_literal);
-      _intLiteral = bigDecimal.intValue();
-      _longLiteral = bigDecimal.longValue();
-      _floatLiteral = bigDecimal.floatValue();
-      _doubleLiteral = bigDecimal.doubleValue();
+      _bigDecimalLiteral = new BigDecimal(_literal);
+    } else if (_dataType == DataType.BOOLEAN) {
+      _bigDecimalLiteral = PinotDataType.BOOLEAN.toBigDecimal(Boolean.valueOf(literal));
+    } else if (_dataType == DataType.TIMESTAMP) {
+      // inferLiteralDataType successfully interpreted the literal as TIMESTAMP. _bigDecimalLiteral is populated and
+      // assigned to _longLiteral.
+      _bigDecimalLiteral = PinotDataType.TIMESTAMP.toBigDecimal(Timestamp.valueOf(literal));
     } else {
-      _intLiteral = 0;
-      _longLiteral = 0L;
-      _floatLiteral = 0F;
-      _doubleLiteral = 0D;
+      _bigDecimalLiteral = BigDecimal.ZERO;
     }
+
+    _intLiteral = _bigDecimalLiteral.intValue();
+    _longLiteral = _bigDecimalLiteral.longValue();
+    _floatLiteral = _bigDecimalLiteral.floatValue();
+    _doubleLiteral = _bigDecimalLiteral.doubleValue();
   }
 
   @VisibleForTesting
@@ -84,6 +92,8 @@ public class LiteralTransformFunction implements TransformFunction {
         return DataType.FLOAT;
       } else if (number instanceof Double) {
         return DataType.DOUBLE;
+      } else if (number instanceof BigDecimal | number instanceof BigInteger) {
+        return DataType.BIG_DECIMAL;
       } else {
         return DataType.STRING;
       }
@@ -206,6 +216,18 @@ public class LiteralTransformFunction implements TransformFunction {
     return doubleResult;
   }
 
+  @Override
+  public BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock) {
+    int numDocs = projectionBlock.getNumDocs();
+    BigDecimal[] bigDecimalResult = _bigDecimalResult;
+    if (bigDecimalResult == null || bigDecimalResult.length < numDocs) {
+      bigDecimalResult = new BigDecimal[numDocs];
+      Arrays.fill(bigDecimalResult, _bigDecimalLiteral);
+      _bigDecimalResult = bigDecimalResult;
+    }
+    return bigDecimalResult;
+  }
+
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
     int numDocs = projectionBlock.getNumDocs();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
index 743d715603..4af9cfd232 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
+import java.math.BigDecimal;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 
@@ -64,6 +65,16 @@ public interface PushDownTransformFunction {
    */
   void transformToDoubleValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, double[] buffer);
 
+  /**
+   * Transforms the data from the given projection block to single-valued BigDecimal values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToBigDecimalValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      BigDecimal[] buffer);
+
   /**
    * Transforms the data from the given projection block to single-valued string values.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
index 39755a0db5..51e7a751f4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
@@ -362,6 +362,9 @@ public class ScalarTransformFunctionWrapper extends BaseTransformFunction {
         case DOUBLE:
           _nonLiteralValues[i] = ArrayUtils.toObject(transformFunction.transformToDoubleValuesSV(projectionBlock));
           break;
+        case BIG_DECIMAL:
+          _nonLiteralValues[i] = transformFunction.transformToBigDecimalValuesSV(projectionBlock);
+          break;
         case BOOLEAN: {
           int[] intValues = transformFunction.transformToIntValuesSV(projectionBlock);
           int numValues = intValues.length;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunction.java
index 49b48f34e7..eae54d1441 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
@@ -117,6 +118,14 @@ public interface TransformFunction {
    */
   double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock);
 
+  /**
+   * Transforms the data from the given projection block to single-valued BigDecimal values.
+   *
+   * @param projectionBlock Projection result
+   * @return Transformation result
+   */
+  BigDecimal[] transformToBigDecimalValuesSV(ProjectionBlock projectionBlock);
+
   /**
    * Transforms the data from the given projection block to single-valued string values.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumPrecisionAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumPrecisionAggregationFunction.java
index 20fb335fc4..f6aee3cc82 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumPrecisionAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumPrecisionAggregationFunction.java
@@ -108,6 +108,12 @@ public class SumPrecisionAggregationFunction extends BaseSingleInputAggregationF
           sum = sum.add(new BigDecimal(stringValues[i]));
         }
         break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+        for (int i = 0; i < length; i++) {
+          sum = sum.add(bigDecimalValues[i]);
+        }
+        break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
@@ -154,6 +160,15 @@ public class SumPrecisionAggregationFunction extends BaseSingleInputAggregationF
           groupByResultHolder.setValueForKey(groupKey, sum);
         }
         break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+        for (int i = 0; i < length; i++) {
+          int groupKey = groupKeyArray[i];
+          BigDecimal sum = getDefaultResult(groupByResultHolder, groupKey);
+          sum = sum.add(bigDecimalValues[i]);
+          groupByResultHolder.setValueForKey(groupKey, sum);
+        }
+        break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
@@ -208,6 +223,17 @@ public class SumPrecisionAggregationFunction extends BaseSingleInputAggregationF
           }
         }
         break;
+      case BIG_DECIMAL:
+        BigDecimal[] bigDecimalValues = blockValSet.getBigDecimalValuesSV();
+        for (int i = 0; i < length; i++) {
+          BigDecimal value = bigDecimalValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            BigDecimal sum = getDefaultResult(groupByResultHolder, groupKey);
+            sum = sum.add(value);
+            groupByResultHolder.setValueForKey(groupKey, sum);
+          }
+        }
+        break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index edb60a1750..371e1a0519 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -258,6 +259,9 @@ public class DistinctTable {
           case DOUBLE:
             dataTableBuilder.setColumn(i, (double) values[i]);
             break;
+          case BIG_DECIMAL:
+            dataTableBuilder.setColumn(i, (BigDecimal) values[i]);
+            break;
           case STRING:
             dataTableBuilder.setColumn(i, (String) values[i]);
             break;
@@ -302,6 +306,9 @@ public class DistinctTable {
           case DOUBLE:
             values[j] = dataTable.getDouble(i, j);
             break;
+          case BIG_DECIMAL:
+            values[j] = dataTable.getBigDecimal(i, j);
+            break;
           case STRING:
             values[j] = dataTable.getString(i, j);
             break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index db01acd686..4b8170b457 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -366,6 +366,9 @@ public class GroupByDataTableReducer implements DataTableReducer {
                     case DOUBLE:
                       values[colId] = dataTable.getDouble(rowId, colId);
                       break;
+                    case BIG_DECIMAL:
+                      values[colId] = dataTable.getBigDecimal(rowId, colId);
+                      break;
                     case STRING:
                       values[colId] = dataTable.getString(rowId, colId);
                       break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/RowBasedBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/RowBasedBlockValSet.java
index 90a759d75a..f09958b6b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/RowBasedBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/RowBasedBlockValSet.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import java.math.BigDecimal;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -37,12 +39,14 @@ import org.apache.pinot.spi.data.FieldSpec;
 public class RowBasedBlockValSet implements BlockValSet {
 
   private final FieldSpec.DataType _dataType;
+  private final PinotDataType _pinotDataType;
   private final List<Object[]> _rows;
   private final int _columnIndex;
 
   public RowBasedBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object[]> rows,
       int columnIndex) {
     _dataType = columnDataType.toDataType();
+    _pinotDataType = PinotDataType.getPinotDataTypeForExecution(columnDataType);
     _rows = rows;
     _columnIndex = columnIndex;
   }
@@ -140,6 +144,16 @@ public class RowBasedBlockValSet implements BlockValSet {
     return values;
   }
 
+  @Override
+  public BigDecimal[] getBigDecimalValuesSV() {
+    int length = _rows.size();
+    BigDecimal[] values = new BigDecimal[length];
+    for (int i = 0; i < length; i++) {
+      values[i] = _pinotDataType.toBigDecimal(_rows.get(i)[_columnIndex]);
+    }
+    return values;
+  }
+
   @Override
   public String[] getStringValuesSV() {
     int length = _rows.size();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 3c72709590..d3d6784d3c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -104,13 +104,15 @@ public class SelectionOperatorService {
 
     int numValuesToCompare = valueIndexList.size();
     int[] valueIndices = new int[numValuesToCompare];
-    boolean[] isNumber = new boolean[numValuesToCompare];
+    boolean[] useDoubleComparison = new boolean[numValuesToCompare];
     // Use multiplier -1 or 1 to control ascending/descending order
     int[] multipliers = new int[numValuesToCompare];
     for (int i = 0; i < numValuesToCompare; i++) {
       int valueIndex = valueIndexList.get(i);
       valueIndices[i] = valueIndex;
-      isNumber[i] = columnDataTypes[valueIndex].isNumber();
+      if (columnDataTypes[valueIndex].isNumber()) {
+        useDoubleComparison[i] = true;
+      }
       multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
     }
 
@@ -120,7 +122,7 @@ public class SelectionOperatorService {
         Object v1 = o1[index];
         Object v2 = o2[index];
         int result;
-        if (isNumber[i]) {
+        if (useDoubleComparison[i]) {
           result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
         } else {
           //noinspection unchecked
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 375e2bd79b..075ff14dab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.selection;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.text.DecimalFormat;
 import java.text.DecimalFormatSymbols;
@@ -273,6 +274,9 @@ public class SelectionOperatorUtils {
           case DOUBLE:
             dataTableBuilder.setColumn(i, ((Number) columnValue).doubleValue());
             break;
+          case BIG_DECIMAL:
+            dataTableBuilder.setColumn(i, (BigDecimal) columnValue);
+            break;
           case STRING:
             dataTableBuilder.setColumn(i, ((String) columnValue));
             break;
@@ -367,6 +371,9 @@ public class SelectionOperatorUtils {
         case DOUBLE:
           row[i] = dataTable.getDouble(rowId, i);
           break;
+        case BIG_DECIMAL:
+          row[i] = dataTable.getBigDecimal(rowId, i);
+          break;
         case STRING:
           row[i] = dataTable.getString(rowId, i);
           break;
@@ -551,6 +558,8 @@ public class SelectionOperatorUtils {
         return THREAD_LOCAL_FLOAT_FORMAT.get().format(((Number) value).floatValue());
       case DOUBLE:
         return THREAD_LOCAL_DOUBLE_FORMAT.get().format(((Number) value).doubleValue());
+      case BIG_DECIMAL:
+        return ((BigDecimal) value).toPlainString();
       case BOOLEAN:
         return (Integer) value == 1 ? "true" : "false";
       case TIMESTAMP:
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/DataFetcherTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/DataFetcherTest.java
index 72cced2809..cb138599bd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/DataFetcherTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/DataFetcherTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.common;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -60,6 +61,7 @@ public class DataFetcherTest {
   private static final String LONG_COLUMN = "long";
   private static final String FLOAT_COLUMN = "float";
   private static final String DOUBLE_COLUMN = "double";
+  private static final String BIG_DECIMAL_COLUMN = "bigDecimal";
   private static final String STRING_COLUMN = "string";
   private static final String BYTES_COLUMN = "bytes";
   private static final String HEX_STRING_COLUMN = "hex_string";
@@ -67,6 +69,7 @@ public class DataFetcherTest {
   private static final String NO_DICT_LONG_COLUMN = "no_dict_long";
   private static final String NO_DICT_FLOAT_COLUMN = "no_dict_float";
   private static final String NO_DICT_DOUBLE_COLUMN = "no_dict_double";
+  private static final String NO_DICT_BIG_DECIMAL_COLUMN = "no_dict_big_decimal";
   private static final String NO_DICT_STRING_COLUMN = "no_dict_string";
   private static final String NO_DICT_BYTES_COLUMN = "no_dict_bytes";
   private static final String NO_DICT_HEX_STRING_COLUMN = "no_dict_hex_string";
@@ -95,6 +98,9 @@ public class DataFetcherTest {
       row.putValue(NO_DICT_FLOAT_COLUMN, (float) value);
       row.putValue(DOUBLE_COLUMN, (double) value);
       row.putValue(NO_DICT_DOUBLE_COLUMN, (double) value);
+      BigDecimal bigDecimalValue = BigDecimal.valueOf(value);
+      row.putValue(BIG_DECIMAL_COLUMN, bigDecimalValue);
+      row.putValue(NO_DICT_BIG_DECIMAL_COLUMN, bigDecimalValue);
       String stringValue = Integer.toString(value);
       row.putValue(STRING_COLUMN, stringValue);
       row.putValue(NO_DICT_STRING_COLUMN, stringValue);
@@ -116,6 +122,7 @@ public class DataFetcherTest {
         new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension(INT_COLUMN, DataType.INT)
             .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
             .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
+            .addMetric(BIG_DECIMAL_COLUMN, DataType.BIG_DECIMAL)
             .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
             .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES)
             .addSingleValueDimension(HEX_STRING_COLUMN, DataType.STRING)
@@ -123,6 +130,7 @@ public class DataFetcherTest {
             .addSingleValueDimension(NO_DICT_LONG_COLUMN, DataType.LONG)
             .addSingleValueDimension(NO_DICT_FLOAT_COLUMN, DataType.FLOAT)
             .addSingleValueDimension(NO_DICT_DOUBLE_COLUMN, DataType.DOUBLE)
+            .addMetric(NO_DICT_BIG_DECIMAL_COLUMN, DataType.BIG_DECIMAL)
             .addSingleValueDimension(NO_DICT_STRING_COLUMN, DataType.STRING)
             .addSingleValueDimension(NO_DICT_BYTES_COLUMN, DataType.BYTES)
             .addSingleValueDimension(NO_DICT_HEX_STRING_COLUMN, DataType.STRING).build();
@@ -148,11 +156,13 @@ public class DataFetcherTest {
     testFetchIntValues(LONG_COLUMN);
     testFetchIntValues(FLOAT_COLUMN);
     testFetchIntValues(DOUBLE_COLUMN);
+    testFetchIntValues(BIG_DECIMAL_COLUMN);
     testFetchIntValues(STRING_COLUMN);
     testFetchIntValues(NO_DICT_INT_COLUMN);
     testFetchIntValues(NO_DICT_LONG_COLUMN);
     testFetchIntValues(NO_DICT_FLOAT_COLUMN);
     testFetchIntValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchIntValues(NO_DICT_BIG_DECIMAL_COLUMN);
     testFetchIntValues(NO_DICT_STRING_COLUMN);
   }
 
@@ -162,11 +172,13 @@ public class DataFetcherTest {
     testFetchLongValues(LONG_COLUMN);
     testFetchLongValues(FLOAT_COLUMN);
     testFetchLongValues(DOUBLE_COLUMN);
+    testFetchLongValues(BIG_DECIMAL_COLUMN);
     testFetchLongValues(STRING_COLUMN);
     testFetchLongValues(NO_DICT_INT_COLUMN);
     testFetchLongValues(NO_DICT_LONG_COLUMN);
     testFetchLongValues(NO_DICT_FLOAT_COLUMN);
     testFetchLongValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchLongValues(NO_DICT_BIG_DECIMAL_COLUMN);
     testFetchLongValues(NO_DICT_STRING_COLUMN);
   }
 
@@ -176,11 +188,13 @@ public class DataFetcherTest {
     testFetchFloatValues(LONG_COLUMN);
     testFetchFloatValues(FLOAT_COLUMN);
     testFetchFloatValues(DOUBLE_COLUMN);
+    testFetchFloatValues(BIG_DECIMAL_COLUMN);
     testFetchFloatValues(STRING_COLUMN);
     testFetchFloatValues(NO_DICT_INT_COLUMN);
     testFetchFloatValues(NO_DICT_LONG_COLUMN);
     testFetchFloatValues(NO_DICT_FLOAT_COLUMN);
     testFetchFloatValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchFloatValues(NO_DICT_BIG_DECIMAL_COLUMN);
     testFetchFloatValues(NO_DICT_STRING_COLUMN);
   }
 
@@ -190,25 +204,45 @@ public class DataFetcherTest {
     testFetchDoubleValues(LONG_COLUMN);
     testFetchDoubleValues(FLOAT_COLUMN);
     testFetchDoubleValues(DOUBLE_COLUMN);
+    testFetchDoubleValues(BIG_DECIMAL_COLUMN);
     testFetchDoubleValues(STRING_COLUMN);
     testFetchDoubleValues(NO_DICT_INT_COLUMN);
     testFetchDoubleValues(NO_DICT_LONG_COLUMN);
     testFetchDoubleValues(NO_DICT_FLOAT_COLUMN);
     testFetchDoubleValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchDoubleValues(NO_DICT_BIG_DECIMAL_COLUMN);
     testFetchDoubleValues(NO_DICT_STRING_COLUMN);
   }
 
+  @Test
+  public void testFetchBigDecimalValues() {
+    testFetchBigDecimalValues(INT_COLUMN);
+    testFetchBigDecimalValues(LONG_COLUMN);
+    testFetchBigDecimalValues(FLOAT_COLUMN);
+    testFetchBigDecimalValues(DOUBLE_COLUMN);
+    testFetchBigDecimalValues(BIG_DECIMAL_COLUMN);
+    testFetchBigDecimalValues(STRING_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_INT_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_LONG_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_FLOAT_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_BIG_DECIMAL_COLUMN);
+    testFetchBigDecimalValues(NO_DICT_STRING_COLUMN);
+  }
+
   @Test
   public void testFetchStringValues() {
     testFetchStringValues(INT_COLUMN);
     testFetchStringValues(LONG_COLUMN);
     testFetchStringValues(FLOAT_COLUMN);
     testFetchStringValues(DOUBLE_COLUMN);
+    testFetchStringValues(BIG_DECIMAL_COLUMN);
     testFetchStringValues(STRING_COLUMN);
     testFetchStringValues(NO_DICT_INT_COLUMN);
     testFetchStringValues(NO_DICT_LONG_COLUMN);
     testFetchStringValues(NO_DICT_FLOAT_COLUMN);
     testFetchStringValues(NO_DICT_DOUBLE_COLUMN);
+    testFetchStringValues(NO_DICT_BIG_DECIMAL_COLUMN);
     testFetchStringValues(NO_DICT_STRING_COLUMN);
   }
 
@@ -224,8 +258,8 @@ public class DataFetcherTest {
   public void testFetchHexStringValues() {
     testFetchHexStringValues(BYTES_COLUMN);
     testFetchHexStringValues(HEX_STRING_COLUMN);
-    testFetchBytesValues(NO_DICT_BYTES_COLUMN);
-    testFetchBytesValues(NO_DICT_HEX_STRING_COLUMN);
+    testFetchHexStringValues(NO_DICT_BYTES_COLUMN);
+    testFetchHexStringValues(NO_DICT_HEX_STRING_COLUMN);
   }
 
   public void testFetchIntValues(String column) {
@@ -288,6 +322,21 @@ public class DataFetcherTest {
     }
   }
 
+  public void testFetchBigDecimalValues(String column) {
+    int[] docIds = new int[NUM_ROWS];
+    int length = 0;
+    for (int i = RANDOM.nextInt(MAX_STEP_LENGTH); i < NUM_ROWS; i += RANDOM.nextInt(MAX_STEP_LENGTH) + 1) {
+      docIds[length++] = i;
+    }
+
+    BigDecimal[] bigDecimalValues = new BigDecimal[length];
+    _dataFetcher.fetchBigDecimalValues(column, docIds, length, bigDecimalValues);
+
+    for (int i = 0; i < length; i++) {
+      Assert.assertEquals(bigDecimalValues[i].intValue(), _values[docIds[i]], ERROR_MESSAGE);
+    }
+  }
+
   public void testFetchStringValues(String column) {
     int[] docIds = new int[NUM_ROWS];
     int length = 0;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 845a8f50be..d40d482a6d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.transform.function;
 
 import java.io.File;
+import java.math.BigDecimal;
 import java.text.DecimalFormat;
 import java.text.DecimalFormatSymbols;
 import java.util.ArrayList;
@@ -49,6 +50,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -69,6 +71,7 @@ public abstract class BaseTransformFunctionTest {
   protected static final String LONG_SV_COLUMN = "longSV";
   protected static final String FLOAT_SV_COLUMN = "floatSV";
   protected static final String DOUBLE_SV_COLUMN = "doubleSV";
+  protected static final String BIG_DECIMAL_SV_COLUMN = "bigDecimalSV";
   protected static final String STRING_SV_COLUMN = "stringSV";
   protected static final String BYTES_SV_COLUMN = "bytesSV";
   protected static final String STRING_ALPHANUM_SV_COLUMN = "stringAlphaNumSV";
@@ -85,6 +88,7 @@ public abstract class BaseTransformFunctionTest {
   protected final long[] _longSVValues = new long[NUM_ROWS];
   protected final float[] _floatSVValues = new float[NUM_ROWS];
   protected final double[] _doubleSVValues = new double[NUM_ROWS];
+  protected final BigDecimal[] _bigDecimalSVValues = new BigDecimal[NUM_ROWS];
   protected final String[] _stringSVValues = new String[NUM_ROWS];
   protected final String[] _stringAlphaNumericSVValues = new String[NUM_ROWS];
   protected final byte[][] _bytesSVValues = new byte[NUM_ROWS][];
@@ -112,6 +116,7 @@ public abstract class BaseTransformFunctionTest {
       _longSVValues[i] = RANDOM.nextLong();
       _floatSVValues[i] = _intSVValues[i] * RANDOM.nextFloat();
       _doubleSVValues[i] = _intSVValues[i] * RANDOM.nextDouble();
+      _bigDecimalSVValues[i] = BigDecimal.valueOf(RANDOM.nextDouble()).multiply(BigDecimal.valueOf(_intSVValues[i]));
       _stringSVValues[i] = df.format(_intSVValues[i] * RANDOM.nextDouble());
       _stringAlphaNumericSVValues[i] = RandomStringUtils.randomAlphanumeric(26);
       _bytesSVValues[i] = RandomStringUtils.randomAlphanumeric(26).getBytes();
@@ -144,6 +149,7 @@ public abstract class BaseTransformFunctionTest {
       map.put(LONG_SV_COLUMN, _longSVValues[i]);
       map.put(FLOAT_SV_COLUMN, _floatSVValues[i]);
       map.put(DOUBLE_SV_COLUMN, _doubleSVValues[i]);
+      map.put(BIG_DECIMAL_SV_COLUMN, _bigDecimalSVValues[i]);
       map.put(STRING_SV_COLUMN, _stringSVValues[i]);
       map.put(STRING_ALPHANUM_SV_COLUMN, _stringAlphaNumericSVValues[i]);
       map.put(BYTES_SV_COLUMN, _bytesSVValues[i]);
@@ -166,6 +172,7 @@ public abstract class BaseTransformFunctionTest {
         .addSingleValueDimension(LONG_SV_COLUMN, FieldSpec.DataType.LONG)
         .addSingleValueDimension(FLOAT_SV_COLUMN, FieldSpec.DataType.FLOAT)
         .addSingleValueDimension(DOUBLE_SV_COLUMN, FieldSpec.DataType.DOUBLE)
+        .addMetric(BIG_DECIMAL_SV_COLUMN, FieldSpec.DataType.BIG_DECIMAL)
         .addSingleValueDimension(STRING_SV_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_ALPHANUM_SV_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(BYTES_SV_COLUMN, FieldSpec.DataType.BYTES)
@@ -204,12 +211,14 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       Assert.assertEquals(intValues[i], expectedValues[i]);
       Assert.assertEquals(longValues[i], expectedValues[i]);
       Assert.assertEquals(floatValues[i], (float) expectedValues[i]);
       Assert.assertEquals(doubleValues[i], (double) expectedValues[i]);
+      Assert.assertEquals(bigDecimalValues[i].intValue(), expectedValues[i]);
       Assert.assertEquals(stringValues[i], Integer.toString(expectedValues[i]));
     }
   }
@@ -219,12 +228,14 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       Assert.assertEquals(intValues[i], (int) expectedValues[i]);
       Assert.assertEquals(longValues[i], expectedValues[i]);
       Assert.assertEquals(floatValues[i], (float) expectedValues[i]);
       Assert.assertEquals(doubleValues[i], (double) expectedValues[i]);
+      Assert.assertEquals(bigDecimalValues[i].longValue(), expectedValues[i]);
       Assert.assertEquals(stringValues[i], Long.toString(expectedValues[i]));
     }
   }
@@ -234,12 +245,14 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       Assert.assertEquals(intValues[i], (int) expectedValues[i]);
       Assert.assertEquals(longValues[i], (long) expectedValues[i]);
       Assert.assertEquals(floatValues[i], expectedValues[i]);
       Assert.assertEquals(doubleValues[i], (double) expectedValues[i]);
+      Assert.assertEquals(bigDecimalValues[i].floatValue(), expectedValues[i]);
       Assert.assertEquals(stringValues[i], Float.toString(expectedValues[i]));
     }
   }
@@ -249,16 +262,46 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = null;
+    try {
+      // 1- Some transform functions cannot work with BigDecimal (e.g. exp, ln, and sqrt).
+      // 2- NumberFormatException is thrown when converting double.NaN, Double.POSITIVE_INFINITY,
+      // or Double.NEGATIVE_INFINITY.
+      bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    } catch (UnsupportedOperationException | NumberFormatException ignored) {
+    }
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       Assert.assertEquals(intValues[i], (int) expectedValues[i]);
       Assert.assertEquals(longValues[i], (long) expectedValues[i]);
       Assert.assertEquals(floatValues[i], (float) expectedValues[i]);
       Assert.assertEquals(doubleValues[i], expectedValues[i]);
+      if (bigDecimalValues != null) {
+        Assert.assertEquals(bigDecimalValues[i].doubleValue(), expectedValues[i]);
+      }
       Assert.assertEquals(stringValues[i], Double.toString(expectedValues[i]));
     }
   }
 
+  protected void testTransformFunction(TransformFunction transformFunction, BigDecimal[] expectedValues) {
+    int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
+    long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
+    float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
+    double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    byte[][] bytesValues = transformFunction.transformToBytesValuesSV(_projectionBlock);
+    String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(intValues[i], expectedValues[i].intValue());
+      Assert.assertEquals(longValues[i], expectedValues[i].longValue());
+      Assert.assertEquals(floatValues[i], expectedValues[i].floatValue());
+      Assert.assertEquals(doubleValues[i], expectedValues[i].doubleValue());
+      Assert.assertEquals(bigDecimalValues[i].compareTo(expectedValues[i]), 0);
+      Assert.assertEquals(BigDecimalUtils.deserialize(bytesValues[i]).compareTo(expectedValues[i]), 0);
+      Assert.assertEquals((new BigDecimal(stringValues[i])).compareTo(expectedValues[i]), 0);
+    }
+  }
+
   protected void testTransformFunction(TransformFunction transformFunction, String[] expectedValues) {
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunctionTest.java
index de7d45fe9e..284fa5e494 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.transform.function;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -72,6 +73,12 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
             Assert.assertEquals(doubleValues[i], _doubleSVValues[i]);
           }
           break;
+        case BIG_DECIMAL:
+          BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+          for (int i = 0; i < NUM_ROWS; i++) {
+            Assert.assertEquals(bigDecimalValues[i].compareTo(_bigDecimalSVValues[i]), 0);
+          }
+          break;
         case STRING:
           String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
           for (int i = 0; i < NUM_ROWS; i++) {
@@ -107,13 +114,20 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
         new Object[]{"jsonExtractScalar(json,'$.longSV','LONG')", FieldSpec.DataType.LONG, true},
         new Object[]{"jsonExtractScalar(json,'$.floatSV','FLOAT')", FieldSpec.DataType.FLOAT, true},
         new Object[]{"jsonExtractScalar(json,'$.doubleSV','DOUBLE')", FieldSpec.DataType.DOUBLE, true},
+        new Object[]{"jsonExtractScalar(json,'$.bigDecimalSV','BIG_DECIMAL')", FieldSpec.DataType.BIG_DECIMAL, true},
+        // Test operating on the output of another transform (trim) to avoid passing the evaluation down to the
+        // storage in order to test transformTransformedValuesToXXXValuesSV() methods.
+        new Object[]{"jsonExtractScalar(trim(json),'$.bigDecimalSV','BIG_DECIMAL')",
+            FieldSpec.DataType.BIG_DECIMAL, true},
         new Object[]{"jsonExtractScalar(json,'$.stringSV','STRING')", FieldSpec.DataType.STRING, true},
         new Object[]{"json_extract_scalar(json,'$.intSV','INT', '0')", FieldSpec.DataType.INT, true},
         new Object[]{"json_extract_scalar(json,'$.intMV','INT_ARRAY', '0')", FieldSpec.DataType.INT, false},
         new Object[]{"json_extract_scalar(json,'$.longSV','LONG', '0')", FieldSpec.DataType.LONG, true},
         new Object[]{"json_extract_scalar(json,'$.floatSV','FLOAT', '0.0')", FieldSpec.DataType.FLOAT, true},
         new Object[]{"json_extract_scalar(json,'$.doubleSV','DOUBLE', '0.0')", FieldSpec.DataType.DOUBLE, true},
-        new Object[]{"json_extract_scalar(json,'$.stringSV','STRING', 'null')", FieldSpec.DataType.STRING, true}
+        new Object[]{"json_extract_scalar(json,'$.stringSV','STRING', 'null')", FieldSpec.DataType.STRING, true},
+        new Object[]{"jsonExtractScalar(json,'$.bigDecimalSV','BIG_DECIMAL', '0.0')",
+            FieldSpec.DataType.BIG_DECIMAL, true},
     };
     //@formatter:on
   }
@@ -126,6 +140,9 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
     TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
     Assert.assertTrue(transformFunction instanceof JsonExtractScalarTransformFunction);
     Assert.assertEquals(transformFunction.getName(), JsonExtractScalarTransformFunction.FUNCTION_NAME);
+    // Note: transformToStringValuesSV() calls IdentifierTransformFunction.transformToStringValuesSV() which in turns
+    //  call DataFetcher.readStringValues() which calls DefaultJsonPathEvaluator.evaluateBlock() that parses String w/o
+    //  support for exact BigDecimal. Therefore, testing string parsing of BigDecimal is disabled.
     String[] resultValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       if (_stringSVValues[i].equals(_stringSVValues[0])) {
@@ -136,9 +153,13 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
             Assert.assertEquals(_intMVValues[i][j], ((List) resultMap.get(0).get("intMV")).get(j));
           }
           Assert.assertEquals(_longSVValues[i], resultMap.get(0).get("longSV"));
-          Assert.assertEquals(Float.compare(_floatSVValues[i], ((Double) resultMap.get(0).get("floatSV")).floatValue()),
+          // Notes: since we use currently a mapper that parses exact big decimals, doubles may get parsed as
+          // big decimals. Confirm this is a backward compatible change?
+          Assert.assertEquals(Float.compare(_floatSVValues[i], ((Number) resultMap.get(0).get("floatSV")).floatValue()),
               0);
           Assert.assertEquals(_doubleSVValues[i], resultMap.get(0).get("doubleSV"));
+          // Disabled:
+          // Assert.assertEquals(_bigDecimalSVValues[i], (BigDecimal) resultMap.get(0).get("bigDecimalSV"));
           Assert.assertEquals(_stringSVValues[i], resultMap.get(0).get("stringSV"));
         } catch (IOException e) {
           throw new RuntimeException();
@@ -204,6 +225,19 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
     }
   }
 
+  @Test
+  public void testJsonPathTransformFunctionForBigDecimal() {
+    ExpressionContext expression =
+        RequestContextUtils.getExpressionFromSQL("jsonExtractScalar(json,'$.bigDecimalSV','BIG_DECIMAL')");
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof JsonExtractScalarTransformFunction);
+    Assert.assertEquals(transformFunction.getName(), JsonExtractScalarTransformFunction.FUNCTION_NAME);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Assert.assertEquals(bigDecimalValues[i].compareTo(_bigDecimalSVValues[i]), 0);
+    }
+  }
+
   @Test
   public void testJsonPathTransformFunctionForString() {
     ExpressionContext expression =
@@ -232,6 +266,7 @@ public class JsonExtractScalarTransformFunctionTest extends BaseTransformFunctio
       Assert.assertTrue(keys.contains(String.format("$['%s']", LONG_SV_COLUMN)));
       Assert.assertTrue(keys.contains(String.format("$['%s']", FLOAT_SV_COLUMN)));
       Assert.assertTrue(keys.contains(String.format("$['%s']", DOUBLE_SV_COLUMN)));
+      Assert.assertTrue(keys.contains(String.format("$['%s']", BIG_DECIMAL_SV_COLUMN)));
       Assert.assertTrue(keys.contains(String.format("$['%s']", STRING_SV_COLUMN)));
       Assert.assertTrue(keys.contains(String.format("$['%s']", INT_MV_COLUMN)));
       Assert.assertTrue(keys.contains(String.format("$['%s']", TIME_COLUMN)));
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunctionTest.java
index 529245d2e3..754ab5b59a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunctionTest.java
@@ -32,6 +32,7 @@ public class LiteralTransformFunctionTest {
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("2147483649"), DataType.LONG);
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("1.2"), DataType.FLOAT);
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("41241241.2412"), DataType.DOUBLE);
+    Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("1.7976931348623159e+308"), DataType.BIG_DECIMAL);
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("true"), DataType.BOOLEAN);
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("false"), DataType.BOOLEAN);
     Assert.assertEquals(LiteralTransformFunction.inferLiteralDataType("2020-02-02 20:20:20.20"), DataType.TIMESTAMP);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapperTest.java
index 1cdfe7c8c6..9f071d2281 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapperTest.java
@@ -246,6 +246,21 @@ public class ScalarTransformFunctionWrapperTest extends BaseTransformFunctionTes
     testTransformFunction(transformFunction, expectedValues);
   }
 
+  @Test
+  public void testIsNullOperator() {
+    ExpressionContext expression = RequestContextUtils.getExpressionFromSQL(String.format("%s IS NULL",
+        BIG_DECIMAL_SV_COLUMN));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    assertTrue(transformFunction instanceof IsNullTransformFunction);
+    assertEquals(transformFunction.getName(), "is_null");
+    int[] expectedValues = new int[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedValues[i] = (_bigDecimalSVValues[i] == null) ? 1 : 0;
+    }
+    testTransformFunction(transformFunction, expectedValues);
+  }
+
+
   @Test
   public void testStringContainsTransformFunction() {
     ExpressionContext expression =
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
new file mode 100644
index 0000000000..a80d51050d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Queries test for BIG_DECIMAL data type.
+ */
+public class BigDecimalQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "BigDecimalQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+  private static final BigDecimal BASE_BIG_DECIMAL = BigDecimal.valueOf(RANDOM.nextDouble());
+
+  private static final int NUM_RECORDS = 1000;
+
+  private static final String BIG_DECIMAL_COLUMN = "bigDecimalColumn";
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().addMetric(BIG_DECIMAL_COLUMN, DataType.BIG_DECIMAL).build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private BigDecimal _sum;
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    BigDecimal sum = BigDecimal.ZERO;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      BigDecimal value = BASE_BIG_DECIMAL.add(BigDecimal.valueOf(i));
+      // Insert data in 3 different formats
+      if (i % 3 == 0) {
+        record.putValue(BIG_DECIMAL_COLUMN, value);
+      } else if (i % 3 == 1) {
+        record.putValue(BIG_DECIMAL_COLUMN, BigDecimalUtils.serialize(value));
+      } else {
+        record.putValue(BIG_DECIMAL_COLUMN, value.toPlainString());
+      }
+      sum = sum.add(value);
+      records.add(record);
+    }
+    _sum = sum;
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testQueries() {
+    {
+      String query = "SELECT * FROM testTable";
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema,
+          new DataSchema(new String[]{BIG_DECIMAL_COLUMN}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 10);
+      for (int i = 0; i < 10; i++) {
+        Object[] row = rows.get(i);
+        assertEquals(row.length, 1);
+        assertEquals(row[0], BASE_BIG_DECIMAL.add(BigDecimal.valueOf(i)));
+      }
+    }
+    {
+      String query = String.format("SELECT * FROM testTable ORDER BY %s DESC LIMIT 40", BIG_DECIMAL_COLUMN);
+      // getBrokerResponseForSqlQuery(query) runs SQL query on multiple index segments. The result should be equivalent
+      // to querying 4 identical index segments.
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema,
+          new DataSchema(new String[]{BIG_DECIMAL_COLUMN}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 40);
+      for (int i = 0; i < 10; i++) {
+        BigDecimal expectedResult = BASE_BIG_DECIMAL.add(BigDecimal.valueOf(NUM_RECORDS - 1 - i));
+        for (int j = 0; j < 4; j++) {
+          Object[] row = rows.get(i * 4 + j);
+          assertEquals(row.length, 1);
+          assertEquals(row[0], expectedResult);
+        }
+      }
+    }
+    {
+      String query = String.format("SELECT DISTINCT %s FROM testTable ORDER BY %s", BIG_DECIMAL_COLUMN,
+          BIG_DECIMAL_COLUMN);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema,
+          new DataSchema(new String[]{BIG_DECIMAL_COLUMN}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 10);
+      for (int i = 0; i < 10; i++) {
+        Object[] row = rows.get(i);
+        assertEquals(row.length, 1);
+        assertEquals(row[0], BASE_BIG_DECIMAL.add(BigDecimal.valueOf(i)));
+      }
+    }
+    {
+      int limit = 40;
+      String query = String.format("SELECT DISTINCT %s FROM testTable ORDER BY %s LIMIT %d",
+          BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN, limit);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema,
+          new DataSchema(new String[]{BIG_DECIMAL_COLUMN}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), limit);
+      for (int i = 0; i < limit; i++) {
+        Object[] row = rows.get(i);
+        assertEquals(row.length, 1);
+        assertEquals(row[0], BASE_BIG_DECIMAL.add(BigDecimal.valueOf(i)));
+      }
+    }
+    {
+      String query = String.format("SELECT COUNT(%s) AS count FROM testTable", BIG_DECIMAL_COLUMN);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema, new DataSchema(new String[]{"count"}, new ColumnDataType[]{ColumnDataType.LONG}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 1);
+      assertEquals((long) rows.get(0)[0], 4 * NUM_RECORDS);
+    }
+    {
+      String query = String.format("SELECT %s FROM testTable GROUP BY %s ORDER BY %s DESC",
+          BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema, new DataSchema(new String[]{BIG_DECIMAL_COLUMN},
+          new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 10);
+      for (int i = 0; i < 10; i++) {
+        Object[] row = rows.get(i);
+        assertEquals(row.length, 1);
+        assertEquals(row[0], BASE_BIG_DECIMAL.add(BigDecimal.valueOf(NUM_RECORDS - i - 1)));
+      }
+    }
+    {
+      String query = String.format("SELECT COUNT(*) AS count, %s FROM testTable GROUP BY %s ORDER BY %s DESC",
+          BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema, new DataSchema(new String[]{"count", BIG_DECIMAL_COLUMN},
+          new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.BIG_DECIMAL}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 10);
+      for (int i = 0; i < 10; i++) {
+        Object[] row = rows.get(i);
+        assertEquals(row.length, 2);
+        assertEquals(row[0], 4L);
+        assertEquals(row[1], BASE_BIG_DECIMAL.add(BigDecimal.valueOf(NUM_RECORDS - i - 1)).toPlainString());
+      }
+    }
+    {
+      String query = String.format("SELECT SUMPRECISION(%s) AS sum FROM testTable", BIG_DECIMAL_COLUMN);
+      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+      ResultTable resultTable = brokerResponse.getResultTable();
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema, new DataSchema(new String[]{"sum"}, new ColumnDataType[]{ColumnDataType.STRING}));
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 1);
+      assertEquals(new BigDecimal((String) rows.get(0)[0]).compareTo(_sum.multiply(BigDecimal.valueOf(4))), 0);
+    }
+    {
+      // This returns currently 25 rows instead of a single row!
+//      int limit = 25;
+//      String query = String.format(
+//          "SELECT SUMPRECISION(%s) AS sum FROM (SELECT %s FROM testTable ORDER BY %s LIMIT %d)",
+//          BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN, BIG_DECIMAL_COLUMN, limit);
+//      BrokerResponseNative brokerResponse = getBrokerResponse(query);
+//      ResultTable resultTable = brokerResponse.getResultTable();
+//      DataSchema dataSchema = resultTable.getDataSchema();
+//      assertEquals(dataSchema, new DataSchema(new String[]{"sum"}, new ColumnDataType[]{ColumnDataType.BIG_DECIMAL}));
+//      List<Object[]> rows = resultTable.getRows();
+//      assertEquals(rows.size(), 1);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
index 69b5e68861..663619166b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.segment.local.io.util;
 
 import java.io.Closeable;
+import java.math.BigDecimal;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 
 
 /**
@@ -34,6 +36,10 @@ public interface ValueReader extends Closeable {
 
   double getDouble(int index);
 
+  default BigDecimal getBigDecimal(int index, int numBytesPerValue) {
+    return BigDecimalUtils.deserialize(getBytes(index, numBytesPerValue));
+  }
+
   /**
    * NOTE: The passed in reusable buffer should have capacity of at least {@code numBytesPerValue}.
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
index d4f3bb5b6b..1ad89ef13e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
@@ -22,11 +22,13 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.io.writer.impl.MutableOffHeapByteArrayStore;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -174,6 +176,11 @@ public class BytesOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimalUtils.deserialize(getBytesValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return BytesUtils.toHexString(getBytesValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
index e30ed4ba87..ade4e664eb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
@@ -21,9 +21,11 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -153,6 +155,11 @@ public class BytesOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimalUtils.deserialize(getBytesValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return BytesUtils.toHexString(getBytesValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
index 59254e939c..f16fcadb16 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -203,6 +204,11 @@ public class DoubleOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
     return _dictIdToValue.getDouble(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getDoubleValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Double.toString(getDoubleValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
index 0917e1775e..fd5f2ecce7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -186,6 +187,11 @@ public class DoubleOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return (Double) get(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getDoubleValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Double.toString(getDoubleValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
index 1bf4d4c9d5..e94866287f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -203,6 +204,11 @@ public class FloatOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
     return getFloatValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getFloatValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Float.toString(getFloatValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
index 19364ab1b2..7f6efa41f9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -186,6 +187,11 @@ public class FloatOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return getFloatValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getFloatValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Float.toString(getFloatValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
index 80e24ef033..3adc427886 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -203,6 +204,11 @@ public class IntOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
     return getIntValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getIntValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(getIntValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
index 4043af2e2b..5302c588c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -186,6 +187,11 @@ public class IntOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return getIntValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getIntValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(getIntValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
index f6ccb60b1e..640be25344 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -204,6 +205,11 @@ public class LongOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
     return getLongValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getLongValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Long.toString(getLongValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
index de75ec7baf..49b791decb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -186,6 +187,11 @@ public class LongOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return getLongValue(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getLongValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Long.toString(getLongValue(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOffHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
index 79783ab387..1f93459b5d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
@@ -22,6 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.segment.local.io.writer.impl.MutableOffHeapByteArrayStore;
@@ -162,6 +163,11 @@ public class StringOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
     return Double.parseDouble(getStringValue(dictId));
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return new BigDecimal(getStringValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return new String(_byteStore.get(dictId), UTF_8);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOnHeapMutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
index 1d1376a0a7..2116611ee1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.dictionary;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -144,6 +145,11 @@ public class StringOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return Double.parseDouble(getStringValue(dictId));
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return new BigDecimal(getStringValue(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return (String) get(dictId);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
index b4110ee684..634bb50cb9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteOrder;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
@@ -35,6 +36,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ public class SegmentDictionaryCreator implements Closeable {
   private Long2IntOpenHashMap _longValueToIndexMap;
   private Float2IntOpenHashMap _floatValueToIndexMap;
   private Double2IntOpenHashMap _doubleValueToIndexMap;
+  private Object2IntOpenHashMap<BigDecimal> _bigDecimalValueToIndexMap;
   private Object2IntOpenHashMap<String> _stringValueToIndexMap;
   private Object2IntOpenHashMap<ByteArray> _bytesValueToIndexMap;
   private int _numBytesPerEntry = 0;
@@ -163,6 +166,26 @@ public class SegmentDictionaryCreator implements Closeable {
             numValues, sortedDoubles[0], sortedDoubles[numValues - 1]);
         return;
 
+      case BIG_DECIMAL:
+        BigDecimal[] sortedBigDecimals = (BigDecimal[]) _sortedValues;
+        numValues = sortedBigDecimals.length;
+
+        Preconditions.checkState(numValues > 0);
+        _bigDecimalValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
+
+        for (int i = 0; i < numValues; i++) {
+          BigDecimal value = sortedBigDecimals[i];
+          _bigDecimalValueToIndexMap.put(value, i);
+          _numBytesPerEntry = Math.max(_numBytesPerEntry, BigDecimalUtils.byteSize(value));
+        }
+
+        writeBytesValueDictionary(sortedBigDecimals);
+        LOGGER.info(
+            "Created dictionary for BIG_DECIMAL column: {}"
+                + " with cardinality: {}, max length in bytes: {}, range: {} to {}",
+            _columnName, numValues, _numBytesPerEntry, sortedBigDecimals[0], sortedBigDecimals[numValues - 1]);
+        return;
+
       case STRING:
         String[] sortedStrings = (String[]) _sortedValues;
         numValues = sortedStrings.length;
@@ -242,6 +265,31 @@ public class SegmentDictionaryCreator implements Closeable {
     }
   }
 
+  private void writeBytesValueDictionary(BigDecimal[] bigDecimalValues)
+      throws IOException {
+    if (_useVarLengthDictionary) {
+      try (VarLengthValueWriter writer = new VarLengthValueWriter(_dictionaryFile, bigDecimalValues.length)) {
+        for (BigDecimal value : bigDecimalValues) {
+          writer.add(BigDecimalUtils.serialize(value));
+        }
+      }
+      LOGGER.info("Using variable length dictionary for column: {}, size: {}", _columnName, _dictionaryFile.length());
+    } else {
+      // Backward-compatible: index file is always big-endian
+      int numValues = bigDecimalValues.length;
+      try (PinotDataBuffer dataBuffer = PinotDataBuffer
+          .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
+              getClass().getSimpleName());
+          FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
+        for (int i = 0; i < bigDecimalValues.length; i++) {
+          writer.writeBytes(i, _numBytesPerEntry, BigDecimalUtils.serialize(bigDecimalValues[i]));
+        }
+      }
+      LOGGER.info("Using fixed length dictionary for column: {}, size: {}", _columnName,
+          (long) numValues * _numBytesPerEntry);
+    }
+  }
+
   public int getNumBytesPerEntry() {
     return _numBytesPerEntry;
   }
@@ -258,6 +306,8 @@ public class SegmentDictionaryCreator implements Closeable {
         return _doubleValueToIndexMap.get((double) value);
       case STRING:
         return _stringValueToIndexMap.getInt(value);
+      case BIG_DECIMAL:
+        return _bigDecimalValueToIndexMap.getInt((BigDecimal) value);
       case BYTES:
         return _bytesValueToIndexMap.get(new ByteArray((byte[]) value));
       default:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 50d0065332..1b65ea63f1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -385,11 +385,13 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
       ColumnStatistics columnProfile = _segmentStats.getColumnProfileFor(columnName);
       boolean useVarLengthDictionary = varLengthDictionaryColumns.contains(columnName);
       Object defaultNullValue = fieldSpec.getDefaultNullValue();
-      if (storedType == DataType.BYTES) {
+      if (storedType == DataType.BYTES || storedType == DataType.BIG_DECIMAL) {
         if (!columnProfile.isFixedLength()) {
           useVarLengthDictionary = true;
         }
-        defaultNullValue = new ByteArray((byte[]) defaultNullValue);
+        if (storedType == DataType.BYTES) {
+          defaultNullValue = new ByteArray((byte[]) defaultNullValue);
+        }
       }
       _indexCreationInfoMap.put(columnName,
           new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/, useVarLengthDictionary,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
new file mode 100644
index 0000000000..1a6ed78c0a
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Set;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+
+
+/**
+ * Extension of {@link AbstractColumnStatisticsCollector} for BigDecimal column type.
+ */
+public class BigDecimalColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
+  private final Set<BigDecimal> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
+
+  private int _minLength = Integer.MAX_VALUE;
+  private int _maxLength = 0;
+  private int _maxRowLength = 0;
+  private BigDecimal[] _sortedValues;
+  private boolean _sealed = false;
+
+  // todo: remove this class if not needed.
+  public BigDecimalColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
+    super(column, statsCollectorConfig);
+  }
+
+  @Override
+  public void collect(Object entry) {
+    if (entry instanceof Object[]) {
+      throw new UnsupportedOperationException();
+    } else {
+      BigDecimal value = (BigDecimal) entry;
+      int length = BigDecimalUtils.byteSize(value);
+      addressSorted(value);
+      if (_values.add(value)) {
+        updatePartition(value);
+        _minLength = Math.min(_minLength, length);
+        _maxLength = Math.max(_maxLength, length);
+        _maxRowLength = _maxLength;
+      }
+      _totalNumberOfEntries++;
+    }
+  }
+
+  @Override
+  public BigDecimal getMinValue() {
+    if (_sealed) {
+      return _sortedValues[0];
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for min value");
+  }
+
+  @Override
+  public BigDecimal getMaxValue() {
+    if (_sealed) {
+      return _sortedValues[_sortedValues.length - 1];
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for max value");
+  }
+
+  @Override
+  public BigDecimal[] getUniqueValuesSet() {
+    if (_sealed) {
+      return _sortedValues;
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for unique values set");
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _minLength;
+  }
+
+  @Override
+  public int getLengthOfLargestElement() {
+    if (_sealed) {
+      return _maxLength;
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for longest value");
+  }
+
+  @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
+  public int getCardinality() {
+    if (_sealed) {
+      return _sortedValues.length;
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for cardinality");
+  }
+
+  @Override
+  public boolean hasNull() {
+    return false;
+  }
+
+  @Override
+  public void seal() {
+    _sortedValues = _values.toArray(new BigDecimal[0]);
+    Arrays.sort(_sortedValues);
+    _sealed = true;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index 023271037a..0ac7e4f7a4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -61,6 +61,10 @@ public class SegmentPreIndexStatsCollectorImpl implements SegmentPreIndexStatsCo
         case DOUBLE:
           _columnStatsCollectorMap.put(column, new DoubleColumnPreIndexStatsCollector(column, _statsCollectorConfig));
           break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(column,
+              new BigDecimalColumnPreIndexStatsCollector(column, _statsCollectorConfig));
+          break;
         case STRING:
           _columnStatsCollectorMap.put(column, new StringColumnPreIndexStatsCollector(column, _statsCollectorConfig));
           break;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 17f58571c5..16a1ee2a5c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
@@ -246,8 +247,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
         return (loadOnHeap) ? new OnHeapDoubleDictionary(dictionaryBuffer, length)
             : new DoubleDictionary(dictionaryBuffer, length);
 
-      case STRING:
+      case BIG_DECIMAL:
         int numBytesPerValue = metadata.getColumnMaxLength();
+        return new BigDecimalDictionary(dictionaryBuffer, length, numBytesPerValue);
+
+      case STRING:
+        numBytesPerValue = metadata.getColumnMaxLength();
         byte paddingByte = (byte) metadata.getPaddingCharacter();
         return loadOnHeap ? new OnHeapStringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte)
             : new StringDictionary(dictionaryBuffer, length, numBytesPerValue, paddingByte);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
index 6942a410d6..1518b6399e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.readers;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
 import org.apache.pinot.segment.local.io.util.ValueReader;
@@ -189,6 +190,24 @@ public abstract class BaseImmutableDictionary implements Dictionary {
     return -(low + 1);
   }
 
+  protected int binarySearch(BigDecimal value) {
+    int low = 0;
+    int high = _length - 1;
+    while (low <= high) {
+      int mid = (low + high) >>> 1;
+      BigDecimal midValue = _valueReader.getBigDecimal(mid, _numBytesPerValue);
+      int compareResult = midValue.compareTo(value);
+      if (compareResult < 0) {
+        low = mid + 1;
+      } else if (compareResult > 0) {
+        high = mid - 1;
+      } else {
+        return mid;
+      }
+    }
+    return -(low + 1);
+  }
+
   /**
    * WARNING: With non-zero padding byte, binary search result might not reflect the real insertion index for the value.
    * E.g. with padding byte 'b', if unpadded value "aa" is in the dictionary, and stored as "aab", then unpadded value
@@ -281,6 +300,10 @@ public abstract class BaseImmutableDictionary implements Dictionary {
     return _valueReader.getDouble(dictId);
   }
 
+  protected BigDecimal getBigDecimal(int dictId) {
+    return _valueReader.getBigDecimal(dictId, _numBytesPerValue);
+  }
+
   protected String getUnpaddedString(int dictId, byte[] buffer) {
     return _valueReader.getUnpaddedString(dictId, _numBytesPerValue, _paddingByte, buffer);
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
similarity index 64%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
index b54256a587..1aba0dde3a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BigDecimalDictionary.java
@@ -18,74 +18,75 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 
 
 /**
- * Extension of {@link BaseImmutableDictionary} that implements immutable dictionary for byte[] type.
+ * Extension of {@link BaseImmutableDictionary} that implements immutable dictionary for BigDecimal type.
  */
-public class BytesDictionary extends BaseImmutableDictionary {
+public class BigDecimalDictionary extends BaseImmutableDictionary {
 
-  public BytesDictionary(PinotDataBuffer dataBuffer, int length, int numBytesPerValue) {
+  public BigDecimalDictionary(PinotDataBuffer dataBuffer, int length, int numBytesPerValue) {
+    // Works with VarLengthValueBuffer only.
     super(dataBuffer, length, numBytesPerValue, (byte) 0);
   }
 
   @Override
   public DataType getValueType() {
-    return DataType.BYTES;
+    return DataType.BIG_DECIMAL;
   }
 
   @Override
   public int insertionIndexOf(String stringValue) {
-    return binarySearch(BytesUtils.toBytes(stringValue));
+    return binarySearch(new BigDecimal(stringValue));
   }
 
   @Override
-  public ByteArray getMinVal() {
-    return new ByteArray(getBytes(0));
+  public BigDecimal getMinVal() {
+    return BigDecimalUtils.deserialize(getBytes(0));
   }
 
   @Override
-  public ByteArray getMaxVal() {
-    return new ByteArray(getBytes(length() - 1));
+  public BigDecimal getMaxVal() {
+    return BigDecimalUtils.deserialize(getBytes(length() - 1));
   }
 
   @Override
-  public byte[] get(int dictId) {
-    return getBytes(dictId);
-  }
-
-  @Override
-  public Object getInternal(int dictId) {
-    return new ByteArray(getBytes(dictId));
+  public BigDecimal get(int dictId) {
+    return getBigDecimal(dictId);
   }
 
   @Override
   public int getIntValue(int dictId) {
-    throw new UnsupportedOperationException();
+    return get(dictId).intValue();
   }
 
   @Override
   public long getLongValue(int dictId) {
-    throw new UnsupportedOperationException();
+    return get(dictId).longValue();
   }
 
   @Override
   public float getFloatValue(int dictId) {
-    throw new UnsupportedOperationException();
+    return get(dictId).floatValue();
   }
 
   @Override
   public double getDoubleValue(int dictId) {
-    throw new UnsupportedOperationException();
+    return get(dictId).doubleValue();
+  }
+
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return getBigDecimal(dictId);
   }
 
   @Override
   public String getStringValue(int dictId) {
-    return BytesUtils.toHexString(getBytes(dictId));
+    return get(dictId).toPlainString();
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
index b54256a587..ad78a724ae 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BytesDictionary.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -83,6 +85,11 @@ public class BytesDictionary extends BaseImmutableDictionary {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimalUtils.deserialize(getBytes(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return BytesUtils.toHexString(getBytes(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
index a3ba543879..1011c854f8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueBytesDictionary.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -96,6 +98,11 @@ public class ConstantValueBytesDictionary extends BaseImmutableDictionary {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimalUtils.deserialize(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return BytesUtils.toHexString(_value);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
index 9adface3d0..d484bec2f2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueDoubleDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -89,6 +90,11 @@ public class ConstantValueDoubleDictionary extends BaseImmutableDictionary {
     return _value;
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Double.toString(_value);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
index 01bbc8c563..7db856074f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueFloatDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -89,6 +90,11 @@ public class ConstantValueFloatDictionary extends BaseImmutableDictionary {
     return _value;
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Float.toString(_value);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
index 288e8efb9f..e7366422f6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueIntDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -89,6 +90,11 @@ public class ConstantValueIntDictionary extends BaseImmutableDictionary {
     return _value;
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(_value);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
index 60edf6cddc..94c50eb14f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueLongDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -89,6 +90,11 @@ public class ConstantValueLongDictionary extends BaseImmutableDictionary {
     return _value;
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Long.toString(_value);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
index 98617c6c39..6e0e854766 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/ConstantValueStringDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BytesUtils;
 
@@ -90,6 +91,11 @@ public class ConstantValueStringDictionary extends BaseImmutableDictionary {
     return Double.parseDouble(_value);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return new BigDecimal(_value);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return _value;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DocIdDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DocIdDictionary.java
index d075f53ec9..59e752a8b8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DocIdDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DocIdDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -74,6 +75,11 @@ public class DocIdDictionary extends BaseImmutableDictionary {
     return dictId;
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(dictId);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(dictId);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
index 029752b5b9..96812e91c9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DoubleDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -63,6 +64,11 @@ public class DoubleDictionary extends BaseImmutableDictionary {
     return getDouble(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getDouble(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Double.toString(getDouble(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
index 65a33df1b0..ede96d682a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/FloatDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -63,6 +64,11 @@ public class FloatDictionary extends BaseImmutableDictionary {
     return getFloat(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getFloat(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Float.toString(getFloat(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
index 8d11ccdf9d..8fbbe9c173 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/IntDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -63,6 +64,11 @@ public class IntDictionary extends BaseImmutableDictionary {
     return getInt(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getInt(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(getInt(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
index df489be167..2c24190a6b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/LongDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -63,6 +64,11 @@ public class LongDictionary extends BaseImmutableDictionary {
     return getLong(dictId);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(getLong(dictId));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Long.toString(getLong(dictId));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
index e0a0876e16..4b93443c4b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapDoubleDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -101,6 +102,11 @@ public class OnHeapDoubleDictionary extends OnHeapDictionary {
     return _dictIdToVal[dictId];
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_dictIdToVal[dictId]);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Double.toString(_dictIdToVal[dictId]);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
index f90d0d1728..5a6e0ff380 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapFloatDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.floats.Float2IntOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -101,6 +102,11 @@ public class OnHeapFloatDictionary extends OnHeapDictionary {
     return _dictIdToVal[dictId];
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_dictIdToVal[dictId]);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Float.toString(_dictIdToVal[dictId]);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
index 85335bdd40..a839622114 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapIntDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -101,6 +102,11 @@ public class OnHeapIntDictionary extends OnHeapDictionary {
     return _dictIdToVal[dictId];
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_dictIdToVal[dictId]);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Integer.toString(_dictIdToVal[dictId]);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
index 43cc5776fe..7f4097a39d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapLongDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -101,6 +102,11 @@ public class OnHeapLongDictionary extends OnHeapDictionary {
     return _dictIdToVal[dictId];
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return BigDecimal.valueOf(_dictIdToVal[dictId]);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return Long.toString(_dictIdToVal[dictId]);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
index 58cea67eeb..d0d07629d9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -119,6 +120,11 @@ public class OnHeapStringDictionary extends OnHeapDictionary {
     return Double.parseDouble(_unpaddedStrings[dictId]);
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return new BigDecimal(_unpaddedStrings[dictId]);
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return _unpaddedStrings[dictId];
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
index 91ee27468d..1df28c7549 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -64,6 +65,11 @@ public class StringDictionary extends BaseImmutableDictionary {
     return Double.parseDouble(getUnpaddedString(dictId, getBuffer()));
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    return new BigDecimal(getUnpaddedString(dictId, getBuffer()));
+  }
+
   @Override
   public String getStringValue(int dictId) {
     return getUnpaddedString(dictId, getBuffer());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/FixedIntArrayOffHeapIdMap.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/FixedIntArrayOffHeapIdMap.java
index 28c269d834..27e1166f2b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/FixedIntArrayOffHeapIdMap.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/FixedIntArrayOffHeapIdMap.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.utils;
 
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.segment.local.io.readerwriter.impl.FixedByteSingleValueMultiColumnReaderWriter;
 import org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
@@ -165,6 +166,11 @@ public class FixedIntArrayOffHeapIdMap extends BaseOffHeapMutableDictionary impl
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public BigDecimal getBigDecimalValue(int dictId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public String getStringValue(int dictId) {
     throw new UnsupportedOperationException();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
index 14abee6015..c55970413e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.creator;
 
 import java.io.File;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,12 +36,14 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.BigDecimalColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
@@ -55,6 +58,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.ReadMode;
@@ -164,6 +168,10 @@ public class DictionariesTest {
           Assert.assertTrue(heapDictionary instanceof DoubleDictionary);
           Assert.assertTrue(mmapDictionary instanceof DoubleDictionary);
           break;
+        case BIG_DECIMAL:
+          Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary);
+          Assert.assertTrue(mmapDictionary instanceof BigDecimalDictionary);
+          break;
         case STRING:
           Assert.assertTrue(heapDictionary instanceof StringDictionary);
           Assert.assertTrue(mmapDictionary instanceof StringDictionary);
@@ -306,6 +314,33 @@ public class DictionariesTest {
     Assert.assertFalse(statsCollector.isSorted());
   }
 
+  @Test
+  public void testBigDecimalColumnPreIndexStatsCollector() {
+    AbstractColumnStatisticsCollector statsCollector =
+        buildStatsCollector("column1", DataType.BIG_DECIMAL, false);
+    statsCollector.collect(BigDecimal.valueOf(1d));
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(2d));
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(3d));
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(4d));
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(4d));
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(2d));
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(40d));
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.collect(BigDecimal.valueOf(20d));
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.seal();
+    Assert.assertEquals(statsCollector.getCardinality(), 6);
+    Assert.assertEquals(((Number) statsCollector.getMinValue()).intValue(), 1);
+    Assert.assertEquals(((Number) statsCollector.getMaxValue()).intValue(), 40);
+    Assert.assertFalse(statsCollector.isSorted());
+  }
+
   @Test
   public void testStringColumnPreIndexStatsCollectorForRandomString() {
     AbstractColumnStatisticsCollector statsCollector = buildStatsCollector("column1", DataType.STRING);
@@ -446,7 +481,25 @@ public class DictionariesTest {
     Schema schema = new Schema();
     schema.addField(new DimensionFieldSpec(column, dataType, true));
     StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(_tableConfig, schema, null);
+    return buildStatsCollector(column, dataType, statsCollectorConfig);
+  }
+
+  private AbstractColumnStatisticsCollector buildStatsCollector(String column, DataType dataType,
+      boolean isDimensionField) {
+    if (isDimensionField) {
+      return buildStatsCollector(column, dataType);
+    }
+
+    Schema schema = new Schema();
+    MetricFieldSpec metricFieldSpec = new MetricFieldSpec(column, dataType);
+    metricFieldSpec.setSingleValueField(true);
+    schema.addField(metricFieldSpec);
+    StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(_tableConfig, schema, null);
+    return buildStatsCollector(column, dataType, statsCollectorConfig);
+  }
 
+  private AbstractColumnStatisticsCollector buildStatsCollector(String column, DataType dataType,
+      StatsCollectorConfig statsCollectorConfig) {
     switch (dataType) {
       case INT:
         return new IntColumnPreIndexStatsCollector(column, statsCollectorConfig);
@@ -456,6 +509,8 @@ public class DictionariesTest {
         return new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig);
       case DOUBLE:
         return new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig);
+      case BIG_DECIMAL:
+        return new BigDecimalColumnPreIndexStatsCollector(column, statsCollectorConfig);
       case BOOLEAN:
       case STRING:
         return new StringColumnPreIndexStatsCollector(column, statsCollectorConfig);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
index decd434022..182f07da34 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
@@ -23,10 +23,12 @@ import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import java.io.File;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeSet;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
@@ -34,6 +36,8 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.testng.Assert;
@@ -51,6 +55,7 @@ public class ImmutableDictionaryTest {
   private static final String LONG_COLUMN_NAME = "longColumn";
   private static final String FLOAT_COLUMN_NAME = "floatColumn";
   private static final String DOUBLE_COLUMN_NAME = "doubleColumn";
+  private static final String BIG_DECIMAL_COLUMN_NAME = "bigDecimalColumn";
   private static final String STRING_COLUMN_NAME = "stringColumn";
   private static final String BYTES_COLUMN_NAME = "bytesColumn";
   private static final int NUM_VALUES = 1000;
@@ -61,6 +66,8 @@ public class ImmutableDictionaryTest {
   private long[] _longValues;
   private float[] _floatValues;
   private double[] _doubleValues;
+  private BigDecimal[] _bigDecimalValues;
+  private int _bigDecimalByteLength;
   private String[] _stringValues;
   private ByteArray[] _bytesValues;
 
@@ -99,6 +106,15 @@ public class ImmutableDictionaryTest {
     _doubleValues = doubleSet.toDoubleArray();
     Arrays.sort(_doubleValues);
 
+    TreeSet<BigDecimal> bigDecimalSet = new TreeSet<>();
+    while (bigDecimalSet.size() < NUM_VALUES) {
+      BigDecimal bigDecimal = BigDecimal.valueOf(RANDOM.nextDouble());
+      _bigDecimalByteLength = Math.max(_bigDecimalByteLength, BigDecimalUtils.byteSize(bigDecimal));
+      bigDecimalSet.add(bigDecimal);
+    }
+    _bigDecimalValues = bigDecimalSet.toArray(new BigDecimal[0]);
+    Arrays.sort(_bigDecimalValues);
+
     Set<String> stringSet = new HashSet<>();
     while (stringSet.size() < NUM_VALUES) {
       stringSet.add(RandomStringUtils.random(RANDOM.nextInt(MAX_STRING_LENGTH)).replace('\0', ' '));
@@ -135,6 +151,17 @@ public class ImmutableDictionaryTest {
       dictionaryCreator.build();
     }
 
+    // Note: BigDecimalDictionary requires setting useVarLengthDictionary to true.
+    boolean useVarLengthDictionary = true;
+    MetricFieldSpec bigDecimalMetricField =
+        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, FieldSpec.DataType.BIG_DECIMAL);
+    bigDecimalMetricField.setSingleValueField(true);
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bigDecimalValues,
+        bigDecimalMetricField, TEMP_DIR,
+        useVarLengthDictionary)) {
+      dictionaryCreator.build();
+    }
+
     try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_stringValues,
         new DimensionFieldSpec(STRING_COLUMN_NAME, FieldSpec.DataType.STRING, true), TEMP_DIR)) {
       dictionaryCreator.build();
@@ -296,6 +323,34 @@ public class ImmutableDictionaryTest {
     }
   }
 
+  @Test
+  public void testBigDecimalDictionary()
+      throws Exception {
+    try (BigDecimalDictionary bigDecimalDictionary = new BigDecimalDictionary(PinotDataBuffer
+        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)),
+        NUM_VALUES, _bigDecimalByteLength)) {
+      testBigDecimalDictionary(bigDecimalDictionary);
+    }
+  }
+
+  private void testBigDecimalDictionary(BaseImmutableDictionary bigDecimalDictionary) {
+    for (int i = 0; i < NUM_VALUES; i++) {
+      assertEquals(bigDecimalDictionary.get(i), _bigDecimalValues[i]);
+      assertEquals(bigDecimalDictionary.getIntValue(i), _bigDecimalValues[i].intValue());
+      assertEquals(bigDecimalDictionary.getLongValue(i), _bigDecimalValues[i].longValue());
+      assertEquals(bigDecimalDictionary.getFloatValue(i), _bigDecimalValues[i].floatValue());
+      assertEquals(bigDecimalDictionary.getDoubleValue(i), _bigDecimalValues[i].doubleValue());
+      assertEquals(bigDecimalDictionary.getBigDecimalValue(i), _bigDecimalValues[i]);
+      Assert.assertEquals(new BigDecimal(bigDecimalDictionary.getStringValue(i)), _bigDecimalValues[i]);
+
+      assertEquals(bigDecimalDictionary.indexOf(String.valueOf(_bigDecimalValues[i])), i);
+
+      BigDecimal randomBigDecimal = BigDecimal.valueOf(RANDOM.nextDouble());
+      assertEquals(bigDecimalDictionary.insertionIndexOf(String.valueOf(randomBigDecimal)),
+          Arrays.binarySearch(_bigDecimalValues, randomBigDecimal));
+    }
+  }
+
   @Test
   public void testStringDictionary()
       throws Exception {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
index 2c2e1dc059..31b17677b3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.segment.index.readers;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.io.File;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
@@ -28,7 +29,9 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.testng.Assert;
@@ -46,6 +49,7 @@ public class ImmutableDictionaryTypeConversionTest {
   private static final String LONG_COLUMN_NAME = "longColumn";
   private static final String FLOAT_COLUMN_NAME = "floatColumn";
   private static final String DOUBLE_COLUMN_NAME = "doubleColumn";
+  private static final String BIG_DECIMAL_COLUMN_NAME = "bigDecimalColumn";
   private static final String STRING_COLUMN_NAME = "stringColumn";
   private static final String BYTES_COLUMN_NAME = "bytesColumn";
   private static final int NUM_VALUES = 1000;
@@ -59,6 +63,8 @@ public class ImmutableDictionaryTypeConversionTest {
   private long[] _longValues;
   private float[] _floatValues;
   private double[] _doubleValues;
+  private BigDecimal[] _bigDecimalValues;
+  private int _bigDecimalByteLength;
   private String[] _stringValues;
   private ByteArray[] _bytesValues;
 
@@ -67,6 +73,7 @@ public class ImmutableDictionaryTypeConversionTest {
   private long[] _longValuesBuffer;
   private float[] _floatValuesBuffer;
   private double[] _doubleValuesBuffer;
+  private BigDecimal[] _bigDecimalValuesBuffer;
   private String[] _stringValuesBuffer;
   private byte[][] _bytesValuesBuffer;
 
@@ -91,6 +98,12 @@ public class ImmutableDictionaryTypeConversionTest {
     _doubleValues = new double[NUM_VALUES];
     ArrayCopyUtils.copy(_intValues, _doubleValues, NUM_VALUES);
 
+    _bigDecimalValues = new BigDecimal[NUM_VALUES];
+    ArrayCopyUtils.copy(_intValues, _bigDecimalValues, NUM_VALUES);
+    for (BigDecimal bigDecimal : _bigDecimalValues) {
+      _bigDecimalByteLength = Math.max(_bigDecimalByteLength, BigDecimalUtils.byteSize(bigDecimal));
+    }
+
     _stringValues = new String[NUM_VALUES];
     ArrayCopyUtils.copy(_intValues, _stringValues, NUM_VALUES);
 
@@ -119,6 +132,14 @@ public class ImmutableDictionaryTypeConversionTest {
       dictionaryCreator.build();
     }
 
+    MetricFieldSpec bigDecimalMetricField = new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME,
+        FieldSpec.DataType.BIG_DECIMAL);
+    bigDecimalMetricField.setSingleValueField(true);
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bigDecimalValues,
+        bigDecimalMetricField, TEMP_DIR)) {
+      dictionaryCreator.build();
+    }
+
     try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_stringValues,
         new DimensionFieldSpec(STRING_COLUMN_NAME, FieldSpec.DataType.STRING, true), TEMP_DIR)) {
       dictionaryCreator.build();
@@ -139,6 +160,7 @@ public class ImmutableDictionaryTypeConversionTest {
     _longValuesBuffer = new long[NUM_VALUES];
     _floatValuesBuffer = new float[NUM_VALUES];
     _doubleValuesBuffer = new double[NUM_VALUES];
+    _bigDecimalValuesBuffer = new BigDecimal[NUM_VALUES];
     _stringValuesBuffer = new String[NUM_VALUES];
     _bytesValuesBuffer = new byte[NUM_VALUES][];
   }
@@ -223,6 +245,16 @@ public class ImmutableDictionaryTypeConversionTest {
     }
   }
 
+  @Test
+  public void testBigDecimalDictionary()
+      throws Exception {
+    try (BigDecimalDictionary bigDecimalDictionary = new BigDecimalDictionary(PinotDataBuffer
+        .mapReadOnlyBigEndianFile(new File(TEMP_DIR, BIG_DECIMAL_COLUMN_NAME + V1Constants.Dict.FILE_EXTENSION)),
+        NUM_VALUES, _bigDecimalByteLength)) {
+      testNumericDictionary(bigDecimalDictionary);
+    }
+  }
+
   private void testNumericDictionary(BaseImmutableDictionary dictionary) {
     for (int i = 0; i < NUM_VALUES; i++) {
       Assert.assertEquals(((Number) dictionary.get(i)).intValue(), _intValues[i]);
@@ -240,6 +272,10 @@ public class ImmutableDictionaryTypeConversionTest {
     Assert.assertEquals(_floatValuesBuffer, _floatValues);
     dictionary.readDoubleValues(_dictIds, NUM_VALUES, _doubleValuesBuffer);
     Assert.assertEquals(_doubleValuesBuffer, _doubleValues);
+    dictionary.readBigDecimalValues(_dictIds, NUM_VALUES, _bigDecimalValuesBuffer);
+    for (int i = 0; i < _bigDecimalValuesBuffer.length; i++) {
+      Assert.assertEquals(_bigDecimalValuesBuffer[i].compareTo(_bigDecimalValues[i]), 0);
+    }
     dictionary.readStringValues(_dictIds, NUM_VALUES, _stringValuesBuffer);
     for (int i = 0; i < NUM_VALUES; i++) {
       Assert.assertEquals(Double.parseDouble(_stringValuesBuffer[i]), _doubleValues[i]);
@@ -247,13 +283,17 @@ public class ImmutableDictionaryTypeConversionTest {
 
     try {
       dictionary.getBytesValue(0);
-      Assert.fail();
+      if (dictionary.getClass() != BigDecimalDictionary.class) {
+        Assert.fail();
+      }
     } catch (UnsupportedOperationException e) {
       // Expected
     }
     try {
       dictionary.readBytesValues(_dictIds, NUM_VALUES, _bytesValuesBuffer);
-      Assert.fail();
+      if (dictionary.getClass() != BigDecimalDictionary.class) {
+        Assert.fail();
+      }
     } catch (UnsupportedOperationException e) {
       // Expected
     }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
index 64badd3169..6bf04ca7d6 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.spi.evaluator;
 
+import java.math.BigDecimal;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -76,6 +77,18 @@ public interface TransformEvaluator {
   <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
       T context, Dictionary dictionary, int[] dictIdBuffer, double[] valueBuffer);
 
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, BigDecimal[] valueBuffer);
+
   /**
    * Evaluate the JSON path and fill the value buffer
    * @param docIds the doc ids to evaluate the JSON path for
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index d889baf60d..e68e60013f 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.spi.index.metadata;
 
+import java.math.BigDecimal;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -261,6 +262,10 @@ public class ColumnMetadataImpl implements ColumnMetadata {
           builder.setMinValue(Double.valueOf(minString));
           builder.setMaxValue(Double.valueOf(maxString));
           break;
+        case BIG_DECIMAL:
+          builder.setMinValue(new BigDecimal(minString));
+          builder.setMaxValue(new BigDecimal(maxString));
+          break;
         case STRING:
           builder.setMinValue(minString);
           builder.setMaxValue(maxString);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index 6359b900a5..fb2ecf6cce 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.index.reader;
 
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.io.Closeable;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -132,6 +133,8 @@ public interface Dictionary extends Closeable {
 
   double getDoubleValue(int dictId);
 
+  BigDecimal getBigDecimalValue(int dictId);
+
   String getStringValue(int dictId);
 
   /**
@@ -171,6 +174,12 @@ public interface Dictionary extends Closeable {
     }
   }
 
+  default void readBigDecimalValues(int[] dictIds, int length, BigDecimal[] outValues) {
+    for (int i = 0; i < length; i++) {
+      outValues[i] = getBigDecimalValue(dictIds[i]);
+    }
+  }
+
   default void readStringValues(int[] dictIds, int length, String[] outValues) {
     for (int i = 0; i < length; i++) {
       outValues[i] = getStringValue(dictIds[i]);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index b0d74f1422..3bd6642a71 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.segment.spi.index.reader;
 
 import java.io.Closeable;
+import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 
 
 /**
@@ -258,6 +260,52 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     }
   }
 
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesSV(int[] docIds, int length, BigDecimal[] values, T context) {
+    // todo(nhejazi): add raw index support to the BIG_DECIMAL type. In most of the cases, it will be more efficient
+    //  to store big decimal as raw.
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = BigDecimal.valueOf(getInt(docIds[i], context));
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = BigDecimal.valueOf(getLong(docIds[i], context));
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = BigDecimal.valueOf(getFloat(docIds[i], context));
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = BigDecimal.valueOf(getDouble(docIds[i], context));
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = new BigDecimal(getString(docIds[i], context));
+        }
+        break;
+      case BYTES:
+        for (int i = 0; i < length; i++) {
+          values[i] = BigDecimalUtils.deserialize(getBytes(docIds[i], context));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
   /**
    * Reads the INT value at the given document id.
    *
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index 55e2add378..ea255329d9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.BooleanUtils;
@@ -59,6 +60,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
   public static final Long DEFAULT_METRIC_NULL_VALUE_OF_LONG = 0L;
   public static final Float DEFAULT_METRIC_NULL_VALUE_OF_FLOAT = 0.0F;
   public static final Double DEFAULT_METRIC_NULL_VALUE_OF_DOUBLE = 0.0D;
+  public static final BigDecimal DEFAULT_METRIC_NULL_VALUE_OF_BIG_DECIMAL = BigDecimal.ZERO;
   public static final String DEFAULT_METRIC_NULL_VALUE_OF_STRING = "null";
   public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = new byte[0];
 
@@ -205,6 +207,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
               return DEFAULT_METRIC_NULL_VALUE_OF_FLOAT;
             case DOUBLE:
               return DEFAULT_METRIC_NULL_VALUE_OF_DOUBLE;
+            case BIG_DECIMAL:
+              // todo(nhejazi): update documentation w/ default null values.
+              return DEFAULT_METRIC_NULL_VALUE_OF_BIG_DECIMAL;
             case STRING:
               return DEFAULT_METRIC_NULL_VALUE_OF_STRING;
             case BYTES:
@@ -299,6 +304,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
         case DOUBLE:
           jsonNode.put(key, (Double) _defaultNullValue);
           break;
+        case BIG_DECIMAL:
+          jsonNode.put(key, (BigDecimal) _defaultNullValue);
+          break;
         case BOOLEAN:
           jsonNode.put(key, (Integer) _defaultNullValue == 1);
           break;
@@ -379,6 +387,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
     LONG(Long.BYTES, true, true),
     FLOAT(Float.BYTES, true, true),
     DOUBLE(Double.BYTES, true, true),
+    BIG_DECIMAL(true, true),
     BOOLEAN(INT, false, true),
     TIMESTAMP(LONG, false, true),
     STRING(false, true),
@@ -463,6 +472,8 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
             return Float.valueOf(value);
           case DOUBLE:
             return Double.valueOf(value);
+          case BIG_DECIMAL:
+            return new BigDecimal(value);
           case BOOLEAN:
             return BooleanUtils.toInt(value);
           case TIMESTAMP:
@@ -494,6 +505,8 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
             return Float.valueOf(value);
           case DOUBLE:
             return Double.valueOf(value);
+          case BIG_DECIMAL:
+            return new BigDecimal(value);
           case BOOLEAN:
             return BooleanUtils.toInt(value);
           case TIMESTAMP:
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index c692904ed3..e8458957f9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -447,6 +447,7 @@ public final class Schema implements Serializable {
             case LONG:
             case FLOAT:
             case DOUBLE:
+            case BIG_DECIMAL:
             case BOOLEAN:
             case TIMESTAMP:
             case STRING:
@@ -464,6 +465,7 @@ public final class Schema implements Serializable {
             case LONG:
             case FLOAT:
             case DOUBLE:
+            case BIG_DECIMAL:
             case BYTES:
               break;
             default:
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ArrayCopyUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ArrayCopyUtils.java
index 42e264fb51..acd5ba9b8b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ArrayCopyUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ArrayCopyUtils.java
@@ -52,6 +52,12 @@ public class ArrayCopyUtils {
     }
   }
 
+  public static void copy(int[] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = BigDecimal.valueOf(src[i]);
+    }
+  }
+
   public static void copy(int[] src, String[] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = Integer.toString(src[i]);
@@ -76,6 +82,12 @@ public class ArrayCopyUtils {
     }
   }
 
+  public static void copy(long[] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = BigDecimal.valueOf(src[i]);
+    }
+  }
+
   public static void copy(long[] src, String[] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = Long.toString(src[i]);
@@ -100,6 +112,12 @@ public class ArrayCopyUtils {
     }
   }
 
+  public static void copy(float[] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = BigDecimal.valueOf(src[i]);
+    }
+  }
+
   public static void copy(float[] src, String[] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = Float.toString(src[i]);
@@ -124,12 +142,50 @@ public class ArrayCopyUtils {
     }
   }
 
+  public static void copy(double[] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      // Note: BigDecimal class provides no representation for NaN, -infinity, +infinity.
+      // This will throw NumberFormatException for Double.NaN, Double.NEGATIVE_INFINITY and Double.POSITIVE_INFINITY.
+      dest[i] = BigDecimal.valueOf(src[i]);
+    }
+  }
+
   public static void copy(double[] src, String[] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = Double.toString(src[i]);
     }
   }
 
+  public static void copy(BigDecimal[] src, int[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = src[i].intValue();
+    }
+  }
+
+  public static void copy(BigDecimal[] src, long[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = src[i].longValue();
+    }
+  }
+
+  public static void copy(BigDecimal[] src, float[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = src[i].floatValue();
+    }
+  }
+
+  public static void copy(BigDecimal[] src, double[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = src[i].doubleValue();
+    }
+  }
+
+  public static void copy(BigDecimal[] src, String[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = src[i].toPlainString();
+    }
+  }
+
   public static void copy(String[] src, int[] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = Double.valueOf(src[i]).intValue();
@@ -154,6 +210,12 @@ public class ArrayCopyUtils {
     }
   }
 
+  public static void copy(String[] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = new BigDecimal(src[i]);
+    }
+  }
+
   public static void copy(String[] src, byte[][] dest, int length) {
     for (int i = 0; i < length; i++) {
       dest[i] = BytesUtils.toBytes(src[i]);
@@ -165,4 +227,10 @@ public class ArrayCopyUtils {
       dest[i] = BytesUtils.toHexString(src[i]);
     }
   }
+
+  public static void copy(byte[][] src, BigDecimal[] dest, int length) {
+    for (int i = 0; i < length; i++) {
+      dest[i] = BigDecimalUtils.deserialize(src[i]);
+    }
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BigDecimalUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BigDecimalUtils.java
index 261f865ec0..6827a7c860 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BigDecimalUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BigDecimalUtils.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.utils;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 
 public class BigDecimalUtils {
@@ -58,4 +59,20 @@ public class BigDecimalUtils {
     BigInteger unscaledValue = new BigInteger(unscaledValueBytes);
     return new BigDecimal(unscaledValue, scale);
   }
+
+  /**
+   * Deserializes a big decimal from ByteArray.
+   */
+  public static BigDecimal deserialize(ByteArray byteArray) {
+    return deserialize(byteArray.getBytes());
+  }
+
+  /**
+   * Deserializes a big decimal from ByteBuffer.
+   */
+  public static BigDecimal deserialize(ByteBuffer byteBuffer) {
+    byte[] bytes = new byte[byteBuffer.remaining()];
+    byteBuffer.get(bytes);
+    return deserialize(bytes);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 4b62f8a188..2850ea6985 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -556,6 +556,8 @@ public class JsonUtils {
       return DataType.BOOLEAN;
     } else if (jsonNode.isBinary()) {
       return DataType.BYTES;
+    } else if (jsonNode.isBigDecimal()) {
+      return DataType.BIG_DECIMAL;
     } else {
       return DataType.STRING;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org