You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/02 19:13:59 UTC

[nifi] 01/02: NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 5c2bfcf7d3b8fdafe70ee4920645bb3e8fa5e539
Author: Bence Simon <si...@gmail.com>
AuthorDate: Wed Apr 22 15:48:10 2020 +0200

    NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../nifi/serialization/record/RecordFieldType.java |  19 ++-
 .../serialization/record/ResultSetRecordSet.java   |  29 +++-
 .../serialization/record/type/DecimalDataType.java |  70 +++++++++
 .../serialization/record/util/DataTypeUtils.java   |  94 +++++++++++-
 .../record/ResultSetRecordSetTest.java             | 165 +++++++++++++++++++++
 .../serialization/record/TestDataTypeUtils.java    | 136 +++++++++++++++++
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   3 +
 .../TestPutElasticsearchHttpRecord.java            |   4 +-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |  11 +-
 .../schema/access/InferenceSchemaStrategy.java     |   4 +
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     |  42 +++++-
 .../schema/access/InferenceSchemaStrategyTest.java | 143 ++++++++++++++++++
 .../schema/validation/StandardSchemaValidator.java |  10 +-
 .../validation/TestStandardSchemaValidator.java    |  13 +-
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |   6 +
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |   3 +
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |  15 ++
 .../org/apache/nifi/processors/orc/PutORCTest.java |  15 +-
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |  20 +++
 .../nifi/controller/kudu/KuduLookupService.java    |   6 +-
 .../processors/kudu/AbstractKuduProcessor.java     |  38 ++++-
 .../org/apache/nifi/processors/kudu/PutKudu.java   |   9 +-
 .../apache/nifi/processors/kudu/MockPutKudu.java   |  15 +-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  30 +++-
 .../reporting/prometheus/PrometheusRecordSink.java |   1 +
 .../prometheus/TestPrometheusRecordSink.java       |  16 +-
 .../nifi/rules/handlers/RecordSinkHandler.java     |   5 +
 .../nifi/rules/handlers/TestRecordSinkHandler.java |   4 +
 .../reporting/AbstractSiteToSiteReportingTask.java |   1 +
 .../org/apache/nifi/processors/solr/SolrUtils.java |   5 +-
 .../apache/nifi/processors/solr/SolrUtilsTest.java |  62 ++++++++
 .../org/apache/nifi/queryrecord/FlowFileTable.java |   3 +
 .../nifi-record-serialization-services/pom.xml     |   1 +
 .../apache/nifi/csv/AbstractCSVRecordReader.java   |   1 +
 .../org/apache/nifi/csv/CSVSchemaInference.java    |   5 +
 .../java/org/apache/nifi/csv/WriteCSVResult.java   |   1 +
 .../org/apache/nifi/json/JsonSchemaInference.java  |   8 +
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |   1 +
 .../java/org/apache/nifi/json/WriteJsonResult.java |   3 +
 .../java/org/apache/nifi/xml/WriteXMLResult.java   |   1 +
 .../java/org/apache/nifi/xml/XMLRecordReader.java  |   2 +
 .../nifi/xml/inference/XmlSchemaInference.java     |   5 +
 .../avro/TestAvroReaderWithEmbeddedSchema.java     |   4 +-
 .../org/apache/nifi/avro/TestWriteAvroResult.java  |  57 ++++++-
 .../org/apache/nifi/csv/TestCSVRecordReader.java   |  22 +++
 .../org/apache/nifi/csv/TestWriteCSVResult.java    |   4 +-
 .../apache/nifi/json/TestJsonSchemaInference.java  |   2 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java     |  16 +-
 .../org/apache/nifi/json/TestWriteJsonResult.java  |   2 +
 .../schema/inference/TestFieldTypeInference.java   |  20 +++
 .../org/apache/nifi/xml/TestWriteXMLResult.java    |   5 +-
 .../org/apache/nifi/xml/TestXMLRecordReader.java   |  17 +++
 .../src/test/resources/avro/decimals.avsc          |  36 +++++
 .../src/test/resources/json/output/dataTypes.json  |   1 +
 54 files changed, 1146 insertions(+), 65 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index 18b0781..4e18fb9 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
 
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 
@@ -73,6 +74,11 @@ public enum RecordFieldType {
     DOUBLE("double", FLOAT),
 
     /**
+     * A decimal field type. Fields of this type use a {@code java.math.BigDecimal} value.
+     */
+    DECIMAL("decimal", FLOAT, DOUBLE),
+
+    /**
      * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
      */
     TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
@@ -95,7 +101,7 @@ public enum RecordFieldType {
     /**
      * A String field type. Fields of this type use a {@code java.lang.String} value.
      */
-    STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP),
+    STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP),
 
     /**
      * <p>
@@ -349,6 +355,17 @@ public enum RecordFieldType {
     }
 
     /**
+     * Returns a Data Type that represents a decimal type with the given precision and scale.
+     *
+     * @param precision the precision of the decimal
+     * @param scale the scale of the decimal
+     * @return a DataType that represents a decimal with added information about it's precision and scale.
+     */
+    public DataType getDecimalDataType(final int precision, final int scale) {
+        return new DecimalDataType(precision, scale);
+    }
+
+    /**
      * Determines whether or this this RecordFieldType is "wider" than the provided type. A type "A" is said to be wider
      * than another type "B" iff A encompasses all values of B and more. For example, the LONG type is wider than INT, and INT
      * is wider than SHORT. "Complex" types (MAP, RECORD, ARRAY, CHOICE) are not wider than any other type, and no other type is
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 953b511..3b98657 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Array;
 import java.sql.ResultSet;
@@ -53,6 +54,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String DATE_CLASS_NAME = Date.class.getName();
     private static final String DOUBLE_CLASS_NAME = Double.class.getName();
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
+    private static final String BIGDECIMAL_CLASS_NAME = BigDecimal.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         this.rs = rs;
@@ -194,6 +196,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
                 return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                final BigDecimal bigDecimal = rs.getBigDecimal(columnIndex);
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
             case Types.OTHER: {
                 // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
                 if (rs.isAfterLast()) {
@@ -212,8 +218,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 final Object obj = rs.getObject(columnIndex);
                 if (!(obj instanceof Record)) {
                     final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
-                        RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME,
-                        RecordFieldType.TIMESTAMP)
+                        RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
+                            RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
                     .map(RecordFieldType::getDataType)
                     .collect(Collectors.toList());
 
@@ -234,7 +240,14 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                     }
                 }
 
-                return getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex)).getDataType();
+                final RecordFieldType fieldType = getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex));
+
+                if (RecordFieldType.DECIMAL.equals(fieldType)) {
+                    final BigDecimal bigDecimalValue = rs.getBigDecimal(columnIndex);
+                    return fieldType.getDecimalDataType(bigDecimalValue.precision(), bigDecimalValue.scale());
+                } else {
+                    return fieldType.getDataType();
+                }
             }
         }
     }
@@ -310,6 +323,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             if (valueToLookAt instanceof Double) {
                 return RecordFieldType.DOUBLE.getDataType();
             }
+            if (valueToLookAt instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+            }
             if (valueToLookAt instanceof Boolean) {
                 return RecordFieldType.BOOLEAN.getDataType();
             }
@@ -353,9 +370,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 return RecordFieldType.CHAR;
             case Types.DATE:
                 return RecordFieldType.DATE;
+            case Types.NUMERIC:
             case Types.DECIMAL:
+                return RecordFieldType.DECIMAL;
             case Types.DOUBLE:
-            case Types.NUMERIC:
             case Types.REAL:
                 return RecordFieldType.DOUBLE;
             case Types.FLOAT:
@@ -393,6 +411,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 if (DOUBLE_CLASS_NAME.equals(valueClassName)) {
                     return RecordFieldType.DOUBLE;
                 }
+                if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.DECIMAL;
+                }
 
                 return RecordFieldType.RECORD;
             case Types.TIME:
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
new file mode 100644
index 0000000..4020bcb
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class DecimalDataType extends DataType {
+    private final int precision;
+    private final int scale;
+
+    public DecimalDataType(final int precision, final int scale) {
+        super(RecordFieldType.DECIMAL, null);
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 31;
+        hash = 41 * hash + getFieldType().hashCode();
+        hash = 41 * hash + precision;
+        hash = 41 * hash + scale;
+        return hash;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof DecimalDataType)) {
+            return false;
+        }
+
+        final DecimalDataType other = (DecimalDataType) obj;
+        return getPrecision() == other.getPrecision() && getScale() == other.getScale();
+    }
+
+    @Override
+    public String toString() {
+        return getFieldType().toString() + "[" + precision + "," + scale + "]";
+    }
+}
+
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f644fda..e1ee122 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.io.InputStream;
 import java.io.Reader;
 import java.lang.reflect.Array;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -93,7 +95,14 @@ public class DataTypeUtils {
             "(" + Base10Decimal + OptionalBase10Exponent + ")" +
         ")";
 
+    private static final String decimalRegex =
+        OptionalSign +
+            "(" + Base10Digits + OptionalBase10Decimal + ")" + "|" +
+            "(" + Base10Digits + OptionalBase10Decimal + Base10Exponent + ")" + "|" +
+            "(" + Base10Decimal + OptionalBase10Exponent + ")";
+
     private static final Pattern FLOATING_POINT_PATTERN = Pattern.compile(doubleRegex);
+    private static final Pattern DECIMAL_PATTERN = Pattern.compile(decimalRegex);
 
     private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
 
@@ -127,6 +136,7 @@ public class DataTypeUtils {
         NUMERIC_VALIDATORS.put(RecordFieldType.SHORT, value -> value instanceof Short);
         NUMERIC_VALIDATORS.put(RecordFieldType.DOUBLE, value -> value instanceof Double);
         NUMERIC_VALIDATORS.put(RecordFieldType.FLOAT, value -> value instanceof Float);
+        NUMERIC_VALIDATORS.put(RecordFieldType.DECIMAL, value -> value instanceof BigDecimal);
     }
 
     public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
@@ -174,6 +184,8 @@ public class DataTypeUtils {
                 return toCharacter(value, fieldName);
             case DATE:
                 return toDate(value, dateFormat, fieldName);
+            case DECIMAL:
+                return toBigDecimal(value, fieldName);
             case DOUBLE:
                 return toDouble(value, fieldName);
             case FLOAT:
@@ -228,6 +240,8 @@ public class DataTypeUtils {
                 return isCharacterTypeCompatible(value);
             case DATE:
                 return isDateTypeCompatible(value, dataType.getFormat());
+            case DECIMAL:
+                return isDecimalTypeCompatible(value);
             case DOUBLE:
                 return isDoubleTypeCompatible(value);
             case FLOAT:
@@ -483,6 +497,10 @@ public class DataTypeUtils {
             if (value instanceof BigInteger) {
                 return RecordFieldType.BIGINT.getDataType();
             }
+            if (value instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) value;
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+            }
         }
 
         if (value instanceof Boolean) {
@@ -1232,6 +1250,10 @@ public class DataTypeUtils {
         return isNumberTypeCompatible(value, DataTypeUtils::isIntegral);
     }
 
+    public static boolean isDecimalTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, DataTypeUtils::isDecimal);
+    }
+
     public static Boolean toBoolean(final Object value, final String fieldName) {
         if (value == null) {
             return null;
@@ -1266,6 +1288,50 @@ public class DataTypeUtils {
         return false;
     }
 
+    public static BigDecimal toBigDecimal(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigDecimal) {
+            return (BigDecimal) value;
+        }
+
+        if (value instanceof Number) {
+            final Number number = (Number) value;
+
+            if (number instanceof Byte
+                    || number instanceof Short
+                    || number instanceof Integer
+                    || number instanceof Long) {
+                return BigDecimal.valueOf(number.longValue());
+            }
+
+            if (number instanceof BigInteger) {
+                return new BigDecimal((BigInteger) number);
+            }
+
+            if (number instanceof Float) {
+                return new BigDecimal(Float.toString((Float) number));
+            }
+
+            if (number instanceof Double) {
+                return new BigDecimal(Double.toString((Double) number));
+            }
+        }
+
+        if (value instanceof String) {
+            try {
+                return new BigDecimal((String) value);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName
+                        + ", value is not a valid representation of BigDecimal", nfe);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName);
+    }
+
     public static Double toDouble(final Object value, final String fieldName) {
         if (value == null) {
             return null;
@@ -1322,6 +1388,14 @@ public class DataTypeUtils {
         return isNumberTypeCompatible(value, s -> isFloatingPoint(s));
     }
 
+    private static boolean isDecimal(final String value) {
+        if (value == null || value.isEmpty()) {
+            return false;
+        }
+
+        return DECIMAL_PATTERN.matcher(value).matches();
+    }
+
     private static boolean isFloatingPoint(final String value) {
         if (value == null || value.isEmpty()) {
             return false;
@@ -1691,15 +1765,31 @@ public class DataTypeUtils {
             case FLOAT:
                 if (otherFieldType == RecordFieldType.DOUBLE) {
                     return Optional.of(otherDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    return Optional.of(otherDataType);
                 }
                 break;
             case DOUBLE:
                 if (otherFieldType == RecordFieldType.FLOAT) {
                     return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    return Optional.of(otherDataType);
                 }
                 break;
+            case DECIMAL:
+                if (otherFieldType == RecordFieldType.DOUBLE) {
+                    return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.FLOAT) {
+                    return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    final DecimalDataType thisDecimalDataType = (DecimalDataType) thisDataType;
+                    final DecimalDataType otherDecimalDataType = (DecimalDataType) otherDataType;
 
-
+                    final int precision = Math.max(thisDecimalDataType.getPrecision(), otherDecimalDataType.getPrecision());
+                    final int scale = Math.max(thisDecimalDataType.getScale(), otherDecimalDataType.getScale());
+                    return Optional.of(RecordFieldType.DECIMAL.getDecimalDataType(precision, scale));
+                }
+                break;
             case CHAR:
                 if (otherFieldType == RecordFieldType.STRING) {
                     return Optional.of(otherDataType);
@@ -1759,6 +1849,8 @@ public class DataTypeUtils {
                 return Types.DOUBLE;
             case FLOAT:
                 return Types.FLOAT;
+            case DECIMAL:
+                return Types.NUMERIC;
             case INT:
                 return Types.INTEGER;
             case SHORT:
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
new file mode 100644
index 0000000..5c9a39c
--- /dev/null
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ResultSetRecordSetTest {
+    private static final Object[][] COLUMNS = new Object[][] {
+            // column number; column label / name / schema field; column type; schema data type;
+            {1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()},
+            {2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()},
+            {3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()},
+            {4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
+            {5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
+            {6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()},
+            {7, "date", Types.DATE, RecordFieldType.DATE.getDataType()},
+            {8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()},
+            {9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
+            {10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()},
+            {11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
+            {12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
+            {13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()},
+            {14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
+            {15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
+            {16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
+    };
+
+    @Mock
+    private ResultSet resultSet;
+
+    @Mock
+    private ResultSetMetaData resultSetMetaData;
+
+    @Before
+    public void setUp() throws SQLException {
+        Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
+
+        for (final Object[] column : COLUMNS) {
+            Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col");
+            Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
+            Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
+        }
+
+        // Big decimal values are necessary in order to determine precision and scale
+        Mockito.when(resultSet.getBigDecimal(14)).thenReturn(BigDecimal.valueOf(1234.567D));
+        Mockito.when(resultSet.getBigDecimal(15)).thenReturn(BigDecimal.valueOf(1234L));
+        Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
+
+        // This will be handled by a dedicated branch for Java Objects, needs some further details
+        Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
+    }
+
+    @Test
+    public void testCreateSchema() throws SQLException {
+        // given
+        final RecordSchema recordSchema = givenRecordSchema();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllColumnDataTypesAreCorrect(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenNoRecordSchema() throws SQLException {
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllColumnDataTypesAreCorrect(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherType() throws SQLException {
+        // given
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForOther();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeWithoutSchema() throws SQLException {
+        // given
+        final ResultSet resultSet = givenResultSetForOther();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
+    }
+
+    private ResultSet givenResultSetForOther() throws SQLException {
+        final ResultSet resultSet = Mockito.mock(ResultSet.class);
+        final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+        Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1);
+        Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+        Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column");
+        Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
+        return resultSet;
+    }
+
+    private RecordSchema givenRecordSchema() {
+        final List<RecordField> fields = new ArrayList<>();
+
+        for (final Object[] column : COLUMNS) {
+            fields.add(new RecordField((String) column[1], (DataType) column[3]));
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
+        Assert.assertNotNull(resultSchema);
+
+        for (final Object[] column : COLUMNS) {
+            Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 6d35789..d82be97 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -24,9 +24,11 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.DoubleAdder;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -266,6 +269,113 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testConvertToBigDecimalWhenInputIsValid() {
+        // given
+        final BigDecimal expectedValue = BigDecimal.valueOf(12L);
+
+        // when & then
+        whenExpectingValidBigDecimalConversion(expectedValue, BigDecimal.valueOf(12L));
+        whenExpectingValidBigDecimalConversion(expectedValue, (byte) 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, (short) 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12L);
+        whenExpectingValidBigDecimalConversion(expectedValue, BigInteger.valueOf(12L));
+        whenExpectingValidBigDecimalConversion(expectedValue, 12F);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12.000F);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12D);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12.000D);
+        whenExpectingValidBigDecimalConversion(expectedValue, "12");
+        whenExpectingValidBigDecimalConversion(expectedValue, "12.000");
+    }
+
+    @Test
+    public void testConvertToBigDecimalWhenNullInput() {
+        assertNull(DataTypeUtils.convertType(null, RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8));
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenInputStringIsInvalid() {
+        DataTypeUtils.convertType("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenUnsupportedType() {
+        DataTypeUtils.convertType(new ArrayList<Double>(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenUnsupportedNumberType() {
+        DataTypeUtils.convertType(new DoubleAdder(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCompatibleDataTypeBigDecimal() {
+        // given
+        final DataType dataType = RecordFieldType.DECIMAL.getDecimalDataType(30, 10);
+
+        // when & then
+        assertTrue(DataTypeUtils.isCompatibleDataType(new BigDecimal("1.2345678901234567890"), dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(new BigInteger("12345678901234567890"), dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(1234567890123456789L, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType((byte) 1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType((short) 1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType("1.2345678901234567890", dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(3.1F, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(3.0D, dataType));
+        assertFalse(DataTypeUtils.isCompatibleDataType("1234567XYZ", dataType));
+        assertFalse(DataTypeUtils.isCompatibleDataType(new Long[]{1L, 2L}, dataType));
+    }
+
+    @Test
+    public void testInferDataTypeWithBigDecimal() {
+        assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(3, 1), DataTypeUtils.inferDataType(BigDecimal.valueOf(12.3D), null));
+    }
+
+    @Test
+    public void testIsBigDecimalTypeCompatible() {
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible((byte) 13));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible((short) 13));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12L));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigInteger.valueOf(12L)));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123F));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123D));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigDecimal.valueOf(12.123D)));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible("123"));
+
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible(null));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("test"));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible(new ArrayList<>()));
+        // Decimal handling does not support NaN and Infinity as the underlying BigDecimal is unable to parse
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("NaN"));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("Infinity"));
+    }
+
+    @Test
+    public void testGetSQLTypeValueWithBigDecimal() {
+        assertEquals(Types.NUMERIC, DataTypeUtils.getSQLTypeValue(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+    }
+
+    @Test
+    public void testChooseDataTypeWhenExpectedIsBigDecimal() {
+        // GIVEN
+        final List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.DOUBLE.getDataType(),
+                RecordFieldType.DECIMAL.getDecimalDataType(2, 1),
+                RecordFieldType.DECIMAL.getDecimalDataType(20, 10)
+        );
+
+        final Object value = new BigDecimal("1.2");
+        final DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(2, 1);
+
+        // WHEN
+        // THEN
+        testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+    }
+
+    @Test
     public void testFloatingPointCompatibility() {
         final String[] prefixes = new String[] {"", "-", "+"};
         final String[] exponents = new String[] {"e0", "e1", "e-1", "E0", "E1", "E-1"};
@@ -504,6 +614,11 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testFindMostSuitableTypeWithBigDecimal() {
+        testFindMostSuitableType(BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3));
+    }
+
+    @Test
     public void testFindMostSuitableTypeWithFloat() {
         testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
     }
@@ -580,6 +695,27 @@ public class TestDataTypeUtils {
         });
     }
 
+    private void whenExpectingValidBigDecimalConversion(final BigDecimal expectedValue, final Object incomingValue) {
+        // Checking indirect conversion
+        final String failureMessage = "Conversion from " + incomingValue.getClass().getSimpleName() + " to " + expectedValue.getClass().getSimpleName() + " failed, when ";
+        final BigDecimal indirectResult = whenExpectingValidConversion(expectedValue, incomingValue, RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        // In some cases, direct equality check comes with false negative as the changing representation brings in
+        // insignificant changes what might break the comparison. For example 12F will be represented as "12.0"
+        assertEquals(failureMessage + "indirect", 0, expectedValue.compareTo(indirectResult));
+
+        // Checking direct conversion
+        final BigDecimal directResult = DataTypeUtils.toBigDecimal(incomingValue, "field");
+        assertEquals(failureMessage + "direct", 0, expectedValue.compareTo(directResult));
+
+    }
+
+    private <T> T whenExpectingValidConversion(final T expectedValue, final Object incomingValue, final DataType dataType) {
+        final Object result = DataTypeUtils.convertType(incomingValue, dataType, null, StandardCharsets.UTF_8);
+        assertNotNull(result);
+        assertTrue(expectedValue.getClass().isInstance(result));
+        return (T) result;
+    }
+
     @Test
     public void testIsIntegerFitsToFloat() {
         final int maxRepresentableInt = Double.valueOf(Math.pow(2, 24)).intValue();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index f1c45a0..2de4281 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -694,6 +694,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     generator.writeNumber((BigInteger) coercedValue);
                 }
                 break;
+            case DECIMAL:
+                generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+                break;
             case BOOLEAN:
                 final String stringValue = coercedValue.toString();
                 if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index afe2055..fa133b4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -41,6 +41,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.ConnectException;
 import java.sql.Date;
 import java.sql.Time;
@@ -664,9 +665,10 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("date", RecordFieldType.DATE);
         parser.addSchemaField("time", RecordFieldType.TIME);
         parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
+        parser.addSchemaField("amount", RecordFieldType.DECIMAL);
 
         for(int i=1; i<=numRecords; i++) {
-            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 950e4cb..f54d329 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -256,6 +257,11 @@ public class AvroTypeUtil {
             case LONG:
                 schema = Schema.create(Type.LONG);
                 break;
+            case DECIMAL:
+                final DecimalDataType decimalDataType = (DecimalDataType) dataType;
+                schema = Schema.create(Type.BYTES);
+                LogicalTypes.decimal(decimalDataType.getPrecision(), decimalDataType.getScale()).addToSchema(schema);
+                break;
             case MAP:
                 schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, false));
                 break;
@@ -341,9 +347,8 @@ public class AvroTypeUtil {
                 case LOGICAL_TYPE_TIMESTAMP_MICROS:
                     return RecordFieldType.TIMESTAMP.getDataType();
                 case LOGICAL_TYPE_DECIMAL:
-                    // We convert Decimal to Double.
-                    // Alternatively we could convert it to String, but numeric type is generally more preferable by users.
-                    return RecordFieldType.DOUBLE.getDataType();
+                    final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+                    return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale());
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
index 1b80886..1c12926 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.EnumSet;
@@ -60,6 +61,9 @@ public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy {
                 field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType());
             } else if (entry.getValue() instanceof Date) {
                 field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType());
+            } else if (entry.getValue() instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) entry.getValue();
+                field = new RecordField(entry.getKey(), RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale()));
             } else if (entry.getValue() instanceof List) {
                 field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType());
             } else if (entry.getValue() instanceof Map) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 412d573..6cc6f4f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -78,6 +78,7 @@ public class TestAvroTypeUtil {
         fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
         fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
         fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
         fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
         fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
         fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
@@ -117,6 +118,7 @@ public class TestAvroTypeUtil {
         assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
         assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
         assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
+        assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), afterConversion.getDataType("decimal").get());
         assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
         assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
         assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
@@ -411,7 +413,7 @@ public class TestAvroTypeUtil {
     public void testConvertAvroRecordToMapWithFieldTypeOfFixedAndLogicalTypeDecimal() {
        // Create a field schema like {"type":"fixed","name":"amount","size":16,"logicalType":"decimal","precision":18,"scale":8}
        final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
-        final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);;
+        final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);
         decimalType.addToSchema(fieldSchema);
 
         // Create a field named "amount" using the field schema above
@@ -421,24 +423,50 @@ public class TestAvroTypeUtil {
         final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
 
         // Create an example Avro record with the amount field of type fixed and a logical type of decimal
-        final BigDecimal expectedDecimalValue = new BigDecimal("1234567890.12345678");
+        final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
         final GenericRecord genericRecord = new GenericData.Record(avroSchema);
-        genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedDecimalValue, fieldSchema, decimalType));
+        genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedBigDecimal, fieldSchema, decimalType));
 
         // Convert the Avro schema to a Record schema
+        thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+    }
+
+    @Test
+    public void testConvertAvroRecordToMapWithFieldTypeOfBinaryAndLogicalTypeDecimal() {
+        // Create a field schema like {"type":"binary","name":"amount","logicalType":"decimal","precision":18,"scale":8}
+        final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
+        final Schema fieldSchema = Schema.create(Type.BYTES);
+        decimalType.addToSchema(fieldSchema);
+
+        // Create a field named "amount" using the field schema above
+        final Schema.Field field = new Schema.Field("amount", fieldSchema, null, (Object)null);
+
+        // Create an overall record schema with the amount field
+        final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
+
+        // Create an example Avro record with the amount field of type binary and a logical type of decimal
+        final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
+        final GenericRecord genericRecord = new GenericData.Record(avroSchema);
+        genericRecord.put("amount", new Conversions.DecimalConversion().toBytes(expectedBigDecimal, fieldSchema, decimalType));
+
+        // Convert the Avro schema to a Record schema
+        thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+    }
+
+    private void thenConvertAvroSchemaToRecordSchema(Schema avroSchema, BigDecimal expectedBigDecimal, GenericRecord genericRecord) {
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
 
         // Convert the Avro record a Map and verify the object produced is the same BigDecimal that was converted to fixed
-        final Map<String,Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
+        final Map<String, Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
         assertNotNull(convertedMap);
         assertEquals(1, convertedMap.size());
 
         final Object resultObject = convertedMap.get("amount");
         assertNotNull(resultObject);
-        assertTrue(resultObject instanceof Double);
+        assertTrue(resultObject instanceof BigDecimal);
 
-        final Double resultDouble = (Double) resultObject;
-        assertEquals(Double.valueOf(expectedDecimalValue.doubleValue()), resultDouble);
+        final BigDecimal resultBigDecimal = (BigDecimal) resultObject;
+        assertEquals(expectedBigDecimal, resultBigDecimal);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
new file mode 100644
index 0000000..e2d82a9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class InferenceSchemaStrategyTest {
+
+    private static final Object[][] CONTENT_FIELDS = new Object[][] {
+            {"integer", 1, RecordFieldType.INT.getDataType()},
+            {"long", 1L, RecordFieldType.LONG.getDataType()},
+            {"boolean", true, RecordFieldType.BOOLEAN.getDataType()},
+            {"double", 1D, RecordFieldType.DOUBLE.getDataType()},
+            {"date", new Date(), RecordFieldType.DATE.getDataType()},
+            {"decimal", BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3)},
+            {"array", new ArrayList<String>(), RecordFieldType.ARRAY.getDataType()},
+
+            // date subclasses
+            {"time", new Time(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+            {"timestamp", new Timestamp(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+
+            // others are considered as string
+            {"byte", (byte) 1, RecordFieldType.STRING.getDataType()},
+            {"short", (short) 1, RecordFieldType.STRING.getDataType()},
+            {"bigint", BigInteger.ONE, RecordFieldType.STRING.getDataType()},
+            {"float", (float) 1, RecordFieldType.STRING.getDataType()},
+            {"char", (char) 1, RecordFieldType.STRING.getDataType()},
+    };
+
+    private final InferenceSchemaStrategy testSubject = new InferenceSchemaStrategy();
+
+    @Test
+    public void testSchemaConversion() throws Exception {
+        // when
+        final RecordSchema result = testSubject.getSchema(null, givenContent(), null);
+
+        // then
+        Assert.assertNotNull(result);
+        thenFieldsAreConvertedProperly(result, true);
+    }
+
+    @Test
+    public void testSchemaConversionWhenMap() throws Exception {
+        // given
+        final Map<String, Object> input = new HashMap<>();
+        final Map<String, Integer> field = new HashMap<>();
+        field.put("a1", 1);
+        field.put("a2", 2);
+        input.put("f1", field);
+
+        // when
+        final RecordSchema result = testSubject.getSchema(null, input, null);
+
+        // then
+        Assert.assertNotNull(result);
+        Assert.assertTrue(RecordDataType.class.isInstance(result.getField("f1").get().getDataType()));
+        final RecordDataType recordDataType = (RecordDataType) result.getField("f1").get().getDataType();
+
+        final RecordSchema childSchema = recordDataType.getChildSchema();
+        Assert.assertNotNull(childSchema);
+        Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a1").get().getDataType());
+        Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a2").get().getDataType());
+    }
+
+    @Test
+    public void testSchemaConversionFromJsonString() throws Exception {
+        // given
+        final String json = "{\"double\":1.0,\"integer\":1,\"long\":9223372036854775,\"boolean\":true,\"array\":[]}";
+
+        // when
+        final RecordSchema result = testSubject.getSchema(  null, new ByteArrayInputStream(json.getBytes()), null);
+
+        // then
+        Assert.assertNotNull(result);
+        thenFieldsAreConvertedProperly(result, false);
+    }
+
+    private Map<String, Object> givenContent() {
+        final HashMap<String, Object> result = new HashMap<>();
+
+        for (final Object[] contentField : CONTENT_FIELDS) {
+            result.put((String) contentField[0], contentField[1]);
+        }
+
+        return result;
+    }
+
+    private Map<String, DataType> givenExpected() {
+        final HashMap<String, DataType> result = new HashMap<>();
+
+        for (final Object[] contentField : CONTENT_FIELDS) {
+            result.put((String) contentField[0], (DataType) contentField[2]);
+        }
+
+        return result;
+    }
+
+    private void thenFieldsAreConvertedProperly(final RecordSchema result, final boolean mustPresent) {
+        final List<RecordField> fields = result.getFields();
+
+        for (final Map.Entry<String, DataType> expected : givenExpected().entrySet()) {
+            final Optional<RecordField> field = fields.stream().filter(f -> f.getFieldName().equals(expected.getKey())).findFirst();
+
+            if (field.isPresent()) {
+                Assert.assertEquals("\"" + expected.getKey() + "\" is expected to be converted " + expected.getValue().toString(), expected.getValue(), field.get().getDataType());
+            } else if (mustPresent) {
+                Assert.fail();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
index 76e7b7d..18b7c7a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
@@ -32,11 +32,10 @@ import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
 import org.apache.nifi.serialization.record.validation.ValidationError;
 import org.apache.nifi.serialization.record.validation.ValidationErrorType;
 
+import java.math.BigInteger;
 import java.util.Map;
 
 public class StandardSchemaValidator implements RecordSchemaValidator {
-
-
     private final SchemaValidationContext validationContext;
 
     public StandardSchemaValidator(final SchemaValidationContext validationContext) {
@@ -273,6 +272,13 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
                         || DataTypeUtils.isIntegerFitsToFloat(value)
                         || DataTypeUtils.isLongFitsToFloat(value)
                         || DataTypeUtils.isBigIntFitsToFloat(value);
+            case DECIMAL:
+                return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType())
+                        || value instanceof Byte
+                        || value instanceof Short
+                        || value instanceof Integer
+                        || value instanceof Long
+                        || value instanceof BigInteger;
         }
 
         return false;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
index dabfa9f..3f4e102 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
@@ -67,7 +68,8 @@ public class TestStandardSchemaValidator {
             RecordFieldType.LONG,
             RecordFieldType.BIGINT,
             RecordFieldType.FLOAT,
-            RecordFieldType.DOUBLE
+            RecordFieldType.DOUBLE,
+            RecordFieldType.DECIMAL
     ));
 
     @Test
@@ -117,6 +119,7 @@ public class TestStandardSchemaValidator {
         valueMap.put("short", (short) 8);
         valueMap.put("int", 9);
         valueMap.put("bigint", BigInteger.valueOf(8L));
+        valueMap.put("decimal", BigDecimal.valueOf(8.1D));
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
@@ -145,6 +148,9 @@ public class TestStandardSchemaValidator {
 
         valueMap.put("float_as_double", 8.0F);
 
+        valueMap.put("float_as_decimal", 8.0F);
+        valueMap.put("double_as_decimal", 8.0D);
+
         final Record record = new MapRecord(schema, valueMap);
 
         final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
@@ -170,18 +176,21 @@ public class TestStandardSchemaValidator {
     public void testByteIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DECIMAL);
     }
 
     @Test
     public void testShortIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DECIMAL);
     }
 
     @Test
     public void testIntegerWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT.intValue(), RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DECIMAL);
     }
 
     @Test
@@ -194,6 +203,7 @@ public class TestStandardSchemaValidator {
     public void testLongWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_DOUBLE, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Long.MAX_VALUE, RecordFieldType.DECIMAL);
     }
 
     @Test
@@ -206,6 +216,7 @@ public class TestStandardSchemaValidator {
     public void testBigintWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(Long.MAX_VALUE), RecordFieldType.DECIMAL);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index ce06f82..b782534 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -22,8 +22,10 @@ import org.apache.avro.util.Utf8;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
@@ -44,6 +46,7 @@ import org.apache.hadoop.io.Text;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -104,6 +107,9 @@ public class NiFiOrcUtils {
             if (o instanceof Double) {
                 return new DoubleWritable((double) o);
             }
+            if (o instanceof BigDecimal) {
+                return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+            }
             if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
                 return new Text(o.toString());
             }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index cd7847f..74cdd2a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -22,6 +22,7 @@ import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -33,6 +34,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -199,6 +201,7 @@ public class TestNiFiOrcUtils {
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L) instanceof LongWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f) instanceof FloatWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0) instanceof DoubleWritable);
+        assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1.0D)) instanceof HiveDecimalWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}) instanceof List);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3)) instanceof List);
         Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index f726010..8af55d5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hive.ql.io.orc;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -45,6 +47,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.orc.MemoryManager;
@@ -52,6 +55,7 @@ import org.apache.orc.OrcConf;
 import org.apache.orc.impl.MemoryManagerImpl;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -104,6 +108,9 @@ public class NiFiOrcUtils {
             if (o instanceof Double) {
                 return new DoubleWritable((double) o);
             }
+            if (o instanceof BigDecimal) {
+                return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+            }
             if (o instanceof String) {
                 return new Text(o.toString());
             }
@@ -286,6 +293,11 @@ public class NiFiOrcUtils {
                 || RecordFieldType.STRING.equals(fieldType)) {
             return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType);
         }
+
+        if (RecordFieldType.DECIMAL.equals(fieldType)) {
+            DecimalDataType decimalDataType = (DecimalDataType) dataType;
+            return TypeInfoFactory.getDecimalTypeInfo(decimalDataType.getPrecision(), decimalDataType.getScale());
+        }
         if (RecordFieldType.DATE.equals(fieldType)) {
             return TypeInfoFactory.dateTypeInfo;
         }
@@ -407,6 +419,9 @@ public class NiFiOrcUtils {
         if (RecordFieldType.FLOAT.equals(dataType)) {
             return "FLOAT";
         }
+        if (RecordFieldType.DECIMAL.equals(dataType)) {
+            return "DECIMAL";
+        }
         if (RecordFieldType.STRING.equals(dataType)) {
             return "STRING";
         }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index a6d0214..b8ed6c2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -66,6 +67,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -75,7 +77,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoField;
-import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -126,7 +127,7 @@ public class PutORCTest {
 
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
         for (final RecordField recordField : recordSchema.getFields()) {
-            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+            readerFactory.addSchemaField(recordField);
         }
 
         if (recordGenerator == null) {
@@ -222,16 +223,14 @@ public class PutORCTest {
     public void testWriteORCWithAvroLogicalTypes() throws IOException, InitializationException {
         final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_logical_types.avsc"), StandardCharsets.UTF_8);
         schema = new Schema.Parser().parse(avroSchema);
-        Calendar now = Calendar.getInstance();
         LocalTime nowTime = LocalTime.now();
         LocalDateTime nowDateTime = LocalDateTime.now();
-        LocalDate epoch = LocalDate.ofEpochDay(0);
         LocalDate nowDate = LocalDate.now();
 
         final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY);
         final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime);
         final Date dt = Date.valueOf(nowDate);
-        final double dec = 1234.56;
+        final BigDecimal bigDecimal = new BigDecimal("92.12");
 
         configure(proc, 10, (numUsers, readerFactory) -> {
             for (int i = 0; i < numUsers; i++) {
@@ -240,7 +239,7 @@ public class PutORCTest {
                         timeMillis,
                         timestampMillis,
                         dt,
-                        dec);
+                        bigDecimal);
             }
             return null;
         });
@@ -265,7 +264,7 @@ public class PutORCTest {
         mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
         // DDL will be created with field names normalized (lowercased, e.g.) for Hive by default
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
-                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DOUBLE) STORED AS ORC");
+                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC");
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
@@ -285,7 +284,7 @@ public class PutORCTest {
                     final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                     noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
                     assertEquals(noTimeOfDayDateFormat.format(dt), ((DateWritableV2) x.get(3)).get().toString());
-                    assertEquals(dec, ((DoubleWritable) x.get(4)).get(), Double.MIN_VALUE);
+                    assertEquals(bigDecimal, ((HiveDecimalWritable) x.get(4)).getHiveDecimal().bigDecimalValue());
                     return null;
                 }
         );
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index bde6af8..19580ea 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -21,7 +21,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -31,13 +33,16 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -170,6 +175,20 @@ public class TestNiFiOrcUtils {
         assertEquals(TypeInfoCreator.createString(), orcType);
     }
 
+
+    @Test
+    public void test_getOrcField_decimal() {
+        // given
+        final DecimalTypeInfo expected = TypeInfoFactory.getDecimalTypeInfo(4, 2);
+        final DataType decimalDataType = RecordFieldType.DECIMAL.getDecimalDataType(4, 2);
+
+        // when
+        final TypeInfo orcField = NiFiOrcUtils.getOrcField(decimalDataType, false);
+
+        // then
+        Assert.assertEquals(expected, orcField);
+    }
+
     @Test
     public void test_getPrimitiveOrcTypeFromPrimitiveFieldType() {
         // Expected ORC types
@@ -205,6 +224,7 @@ public class TestNiFiOrcUtils {
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L, true) instanceof LongWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f, true) instanceof FloatWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0, true) instanceof DoubleWritable);
+        assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1L), true) instanceof HiveDecimalWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}, true) instanceof List);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3), true) instanceof List);
         Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index b044f9a..bc59b12 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.AsyncKuduClient;
@@ -311,12 +312,15 @@ public class KuduLookupService extends AbstractControllerService implements Reco
                 case INT64:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
                     break;
+                case DECIMAL:
+                    final ColumnTypeAttributes attributes = cs.getTypeAttributes();
+                    fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())));
+                    break;
                 case UNIXTIME_MICROS:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
                     break;
                 case BINARY:
                 case STRING:
-                case DECIMAL:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
                     break;
                 case DOUBLE:
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index c41c8d5..b9639e5 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -18,8 +18,10 @@
 package org.apache.nifi.processors.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.AsyncKuduClient;
 import org.apache.kudu.client.Delete;
 import org.apache.kudu.client.Insert;
@@ -53,6 +55,8 @@ import org.apache.nifi.security.krb.KerberosPasswordUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StringUtils;
 
@@ -369,7 +373,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
     /**
      * Converts a NiFi DataType to it's equivalent Kudu Type.
      */
-    protected Type toKuduType(DataType nifiType) {
+    private Type toKuduType(DataType nifiType) {
         switch (nifiType.getFieldType()) {
             case BOOLEAN:
                 return Type.BOOL;
@@ -385,6 +389,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                 return Type.FLOAT;
             case DOUBLE:
                 return Type.DOUBLE;
+            case DECIMAL:
+                return Type.DECIMAL;
             case TIMESTAMP:
                 return Type.UNIXTIME_MICROS;
             case CHAR:
@@ -395,6 +401,36 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
         }
     }
 
+    private ColumnTypeAttributes getKuduTypeAttributes(final DataType nifiType) {
+        if (nifiType.getFieldType().equals(RecordFieldType.DECIMAL)) {
+            final DecimalDataType decimalDataType = (DecimalDataType) nifiType;
+            return new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(decimalDataType.getPrecision()).scale(decimalDataType.getScale()).build();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Based on NiFi field declaration, generates an alter statement to extend table with new column. Note: simply calling
+     * {@link AlterTableOptions#addNullableColumn(String, Type)} is not sufficient as it does not cover BigDecimal scale and precision handling.
+     *
+     * @param columnName Name of the new table column.
+     * @param nifiType Type of the field.
+     *
+     * @return Alter table statement to extend table with the new field.
+     */
+    protected AlterTableOptions getAddNullableColumnStatement(final String columnName, final DataType nifiType) {
+        final AlterTableOptions alterTable = new AlterTableOptions();
+
+        alterTable.addColumn(new ColumnSchema.ColumnSchemaBuilder(columnName, toKuduType(nifiType))
+                .nullable(true)
+                .defaultValue(null)
+                .typeAttributes(getKuduTypeAttributes(nifiType))
+                .build());
+
+        return alterTable;
+    }
+
     private int getColumnIndex(Schema columns, String colName) {
         try {
             return columns.getColumnIndex(colName);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 2ac0195..c0d7e46 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -18,11 +18,10 @@
 package org.apache.nifi.processors.kudu;
 
 import org.apache.kudu.Schema;
-import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.RowError;
@@ -325,10 +324,8 @@ public class PutKudu extends AbstractKuduProcessor {
                         // we created by a concurrent thread or application attempting to handle schema drift.
                         for (RecordField field : missing) {
                             try {
-                                String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
-                                AlterTableOptions alter = new AlterTableOptions();
-                                alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
-                                kuduClient.alterTable(tableName, alter);
+                                final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                                kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
                             } catch (KuduException e) {
                                 // Ignore the exception if the column already exists due to concurrent
                                 // threads or applications attempting to handle schema drift.
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 52ab00d..4f634fd 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kudu;
 
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
@@ -37,6 +38,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -47,6 +49,9 @@ public class MockPutKudu extends PutKudu {
     private KuduSession session;
     private LinkedList<Insert> insertQueue;
 
+    // Atomic reference is used as the set and use of the schema are in different thread
+    private AtomicReference<Schema> tableSchema = new AtomicReference<>();
+
     private boolean loggedIn = false;
     private boolean loggedOut = false;
 
@@ -102,7 +107,9 @@ public class MockPutKudu extends PutKudu {
         final KuduClient client = mock(KuduClient.class);
 
         try {
-            when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+            final KuduTable kuduTable = mock(KuduTable.class);
+            when(client.openTable(anyString())).thenReturn(kuduTable);
+            when(kuduTable.getSchema()).thenReturn(tableSchema.get());
         } catch (final Exception e) {
             throw new AssertionError(e);
         }
@@ -173,7 +180,11 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected KuduSession createKuduSession(KuduClient client) {
+    protected KuduSession createKuduSession(final KuduClient client) {
         return session;
     }
+
+    void setTableSchema(final Schema tableSchema) {
+        this.tableSchema.set(tableSchema);
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 303a03b..97e3e3d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -25,9 +25,9 @@ import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.RowErrorsAndOverflowStatus;
-import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
@@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -85,7 +86,7 @@ public class TestPutKudu {
     public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
     public static final String DEFAULT_MASTERS = "testLocalHost:7051";
     public static final String SKIP_HEAD_LINE = "false";
-    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal";
+    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal";
 
     private TestRunner testRunner;
 
@@ -122,9 +123,10 @@ public class TestPutKudu {
         readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+        readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3)));
 
         for (int i=0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i)));
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -260,6 +262,26 @@ public class TestPutKudu {
     }
 
     @Test
+    public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException, IOException {
+        // given
+        processor.setTableSchema(new Schema(Arrays.asList()));
+        createRecordReader(5);
+        final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
+        testRunner.enqueue("trigger", flowFileAttributes);
+
+        // when
+        testRunner.run();
+
+        // then
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
         createRecordReader(10);
 
@@ -288,7 +310,7 @@ public class TestPutKudu {
     public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
         createRecordReader(0);
         // add the favorite color as a string
-        readerFactory.addRecord(1, "name0", "0", "89.89");
+        readerFactory.addRecord(1, "name0", "0", "89.89", "111.111");
 
         final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
 
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 3a4bd20..91fd7c2 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -207,6 +207,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
                 || RecordFieldType.BIGINT.equals(dataType)
                 || RecordFieldType.FLOAT.equals(dataType)
                 || RecordFieldType.DOUBLE.equals(dataType)
+                || RecordFieldType.DECIMAL.equals(dataType)
                 || RecordFieldType.BOOLEAN.equals(dataType);
 
     }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 02a54b9..34bb657 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -29,8 +29,8 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.ListRecordSet;
@@ -47,6 +47,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
@@ -71,17 +72,20 @@ public class TestPrometheusRecordSink {
 
         List<RecordField> recordFields = Arrays.asList(
                 new RecordField("field1", RecordFieldType.INT.getDataType()),
-                new RecordField("field2", RecordFieldType.STRING.getDataType())
+                new RecordField("field2", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)),
+                new RecordField("field3", RecordFieldType.STRING.getDataType())
         );
         RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
 
         Map<String, Object> row1 = new HashMap<>();
         row1.put("field1", 15);
-        row1.put("field2", "Hello");
+        row1.put("field2", BigDecimal.valueOf(12.34567D));
+        row1.put("field3", "Hello");
 
         Map<String, Object> row2 = new HashMap<>();
         row2.put("field1", 6);
-        row2.put("field2", "World!");
+        row2.put("field2", BigDecimal.valueOf(0.1234567890123456789D));
+        row2.put("field3", "World!");
 
         RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
                 new MapRecord(recordSchema, row1),
@@ -95,8 +99,10 @@ public class TestPrometheusRecordSink {
         assertEquals(2, writeResult.getRecordCount());
         assertEquals("Hello", writeResult.getAttributes().get("a"));
 
+
         final String content = getMetrics();
-        assertTrue(content.contains("field1{field2=\"Hello\",} 15.0\nfield1{field2=\"World!\",} 6.0\n"));
+        assertTrue(content.contains("field1{field3=\"Hello\",} 15.0\nfield1{field3=\"World!\",} 6.0\n"));
+        assertTrue(content.contains("field2{field3=\"Hello\",} 12.34567\nfield2{field3=\"World!\",} 0.12345678901234568\n"));
 
         try {
             sink.onStopped();
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
index 789ab00..55c0de4 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
@@ -122,6 +122,11 @@ public class RecordSinkHandler extends AbstractActionHandlerService{
             if (value.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(value);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
index 8566a2a..bd11534 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -79,10 +80,12 @@ public class TestRecordSinkHandler {
         final Map<String,String> attributes = new HashMap<>();
         final Map<String,Object> metrics = new HashMap<>();
         final String expectedMessage = "Records written to sink service:";
+        final BigDecimal bigDecimalValue = new BigDecimal(String.join("", Collections.nCopies(400, "1")) + ".2");
 
         attributes.put("sendZeroResults","false");
         metrics.put("jvmHeap","1000000");
         metrics.put("cpu","90");
+        metrics.put("custom", bigDecimalValue);
 
         final Action action = new Action();
         action.setType("SEND");
@@ -96,6 +99,7 @@ public class TestRecordSinkHandler {
         Map<String,Object> record = rows.get(0);
         assertEquals("90", (record.get("cpu")));
         assertEquals("1000000", (record.get("jvmHeap")));
+        assertEquals(bigDecimalValue, (record.get("custom")));
     }
 
     private static class MockRecordSinkHandler extends RecordSinkHandler {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 5f8328f..cf63a70 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -365,6 +365,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
                 case FLOAT:
                 case INT:
                 case BIGINT:
+                case DECIMAL:
                 case LONG:
                 case SHORT:
                 case STRING:
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index 60fa072..af1f2a4 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -404,7 +404,7 @@ public class SolrUtils {
                 continue;
             }else {
                 final DataType dataType = schema.getDataType(field.getFieldName()).get();
-                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
+                writeValue(inputDocument, value, fieldName, dataType, fieldsToIndex);
             }
         }
     }
@@ -462,6 +462,9 @@ public class SolrUtils {
                     addFieldToSolrDocument(inputDocument,fieldName, coercedValue,fieldsToIndex);
                 }
                 break;
+            case DECIMAL:
+                addFieldToSolrDocument(inputDocument, fieldName, DataTypeUtils.toBigDecimal(coercedValue, fieldName), fieldsToIndex);
+                break;
             case BOOLEAN:
                 final String stringValue = coercedValue.toString();
                 if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
new file mode 100644
index 0000000..edc678e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SolrUtilsTest {
+
+    @Mock
+    private SolrInputDocument inputDocument;
+
+    @Test
+    public void test() throws Exception {
+        // given
+        final String value = "12345678901234567890.123456789012345678901234567890";
+        final BigDecimal bigDecimalValue = new BigDecimal(value);
+        final List<RecordField> fields = Collections.singletonList(new RecordField("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("test", bigDecimalValue);
+
+        final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+        final List<String> fieldsToIndex = Collections.singletonList("parent_test");
+
+        // when
+        SolrUtils.writeRecord(record, inputDocument, fieldsToIndex, "parent");
+
+        // then
+        Mockito.verify(inputDocument, Mockito.times(1)).addField("parent_test", bigDecimalValue);
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 18cbc63..4f0fec2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -46,6 +46,7 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 
 import java.lang.reflect.Type;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -223,6 +224,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
                 return typeFactory.createJavaType(HashMap.class);
             case BIGINT:
                 return typeFactory.createJavaType(BigInteger.class);
+            case DECIMAL:
+                return typeFactory.createJavaType(BigDecimal.class);
             case CHOICE:
                 final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
                 DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index d888e3c..f17fe3c 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -133,6 +133,7 @@
                 <configuration>
                     <excludes combine.children="append">
                         <exclude>src/test/resources/avro/datatypes.avsc</exclude>
+                        <exclude>src/test/resources/avro/decimals.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
index 746b1ce..f073464 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
@@ -104,6 +104,7 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
             case LONG:
             case FLOAT:
             case DOUBLE:
+            case DECIMAL:
             case BYTE:
             case CHAR:
             case SHORT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index 10b18a9..bc25744 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -89,6 +89,11 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
             if (value.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(value);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 6526ebe..733d3a5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -156,6 +156,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
         switch (fieldType) {
             case BIGINT:
             case BYTE:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case LONG:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
index 02587cc..10b4129 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
@@ -23,7 +23,9 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.DecimalNode;
 
+import java.math.BigDecimal;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
@@ -62,6 +64,12 @@ public class JsonSchemaInference extends HierarchicalSchemaInference<JsonNode> {
             return RecordFieldType.LONG.getDataType();
         }
 
+        if (jsonNode.isBigDecimal()) {
+            final DecimalNode decimalNode = (DecimalNode) jsonNode;
+            final BigDecimal value = decimalNode.getDecimalValue();
+            return RecordFieldType.DECIMAL.getDecimalDataType(value.precision(), value.scale());
+        }
+
         if (jsonNode.isFloatingPointNumber()) {
             return RecordFieldType.DOUBLE.getDataType();
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index d0172e9..469eb80 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -160,6 +160,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index dd7eaec..80c8512 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -376,6 +376,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
             case STRING:
                 generator.writeString(coercedValue.toString());
                 break;
+            case DECIMAL:
+                generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+                break;
             case BIGINT:
                 if (coercedValue instanceof Long) {
                     generator.writeNumber(((Long) coercedValue).longValue());
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
index 16a1e9e..8b4710b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
@@ -250,6 +250,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
index 4a2eacb..2cb165b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
@@ -145,6 +145,7 @@ public class XMLRecordReader implements RecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
@@ -528,6 +529,7 @@ public class XMLRecordReader implements RecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
index 134e9a2..0e2ccab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
@@ -57,6 +57,11 @@ public class XmlSchemaInference extends HierarchicalSchemaInference<XmlNode> {
             if (text.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(text);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(text.length() - 1, text.length() - 1 - text.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index b5fd869..d310d4f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -113,7 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType());
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType());
             assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType());
-            assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType());
+            assertEquals(RecordFieldType.DECIMAL, recordSchema.getDataType("decimal").get().getFieldType());
 
             final Record record = reader.nextRecord();
             assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis"));
@@ -123,7 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
             noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
             assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
-            assertEquals(bigDecimal.doubleValue(), record.getValue("decimal"));
+            assertEquals(bigDecimal, record.getValue("decimal"));
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 7a35ba5..a25e80e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -33,6 +33,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -139,6 +140,53 @@ public abstract class TestWriteAvroResult {
     }
 
     @Test
+    public void testDecimalType() throws IOException {
+        final Object[][] decimals = new Object[][] {
+                // id, record field, value, expected value
+
+                // Uses the whole precision and scale
+                {1, RecordFieldType.DECIMAL.getDecimalDataType(10, 2),  new BigDecimal("12345678.12"),  new BigDecimal("12345678.12")},
+
+                // Uses less precision and scale than allowed
+                {2, RecordFieldType.DECIMAL.getDecimalDataType(10, 2),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+
+                // Record schema uses smaller precision and scale than allowed
+                {3, RecordFieldType.DECIMAL.getDecimalDataType(8, 1),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+
+                // Record schema uses bigger precision and scale than allowed
+                {4, RecordFieldType.DECIMAL.getDecimalDataType(16, 4),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+        };
+
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/decimals.avsc"));
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final List<RecordField> fields = new ArrayList<>();
+        final Map<String, Object> values = new HashMap<>();
+
+        for (final Object[] decimal : decimals) {
+            fields.add(new RecordField("decimal" + decimal[0], (DataType) decimal[1]));
+            values.put("decimal" + decimal[0], decimal[2]);
+        }
+
+        final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writer.write(RecordSet.of(record.getSchema(), record));
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final GenericRecord avroRecord = readRecord(in, schema);
+
+            for (final Object[] decimal : decimals) {
+                final Schema decimalSchema = schema.getField("decimal" + decimal[0]).schema();
+                final LogicalType logicalType = decimalSchema.getLogicalType();
+                Assert.assertEquals(decimal[3], new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal" + decimal[0]), decimalSchema, logicalType));
+            }
+        }
+    }
+
+    @Test
     public void testLogicalTypes() throws IOException, ParseException {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
         testLogicalTypes(schema);
@@ -159,8 +207,7 @@ public abstract class TestWriteAvroResult {
         fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
-        // Avro decimal is represented as double in NiFi type system.
-        fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(5,2)));
         final RecordSchema recordSchema = new SimpleRecordSchema(fields);
 
         final String expectedTime = "2017-04-04 14:20:33.789";
@@ -174,9 +221,7 @@ public abstract class TestWriteAvroResult {
         values.put("timestampMillis", new Timestamp(timeLong));
         values.put("timestampMicros", new Timestamp(timeLong));
         values.put("date", new Date(timeLong));
-        // Avro decimal is represented as double in NiFi type system.
-        final BigDecimal expectedDecimal = new BigDecimal("123.45");
-        values.put("decimal", expectedDecimal.doubleValue());
+        values.put("decimal", new BigDecimal("123.45"));
         final Record record = new MapRecord(recordSchema, values);
 
         try (final RecordSetWriter writer = createWriter(schema, baos)) {
@@ -201,7 +246,7 @@ public abstract class TestWriteAvroResult {
                     // Union type doesn't return logical type. Find the first logical type defined within the union.
                     : decimalSchema.getTypes().stream().map(s -> s.getLogicalType()).filter(Objects::nonNull).findFirst().get();
             final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, logicalType);
-            assertEquals(expectedDecimal, decimal);
+            assertEquals(new BigDecimal("123.45"), decimal);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 230a0e3..ca36430 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -36,6 +36,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Time;
@@ -45,6 +46,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -120,6 +122,26 @@ public class TestCSVRecordReader {
     }
 
     @Test
+    public void testBigDecimal() throws IOException, MalformedRecordException {
+        final String value = String.join("", Collections.nCopies(500, "1")) + ".2";
+        final String text = "decimal\n" + value;
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
+             final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), StandardCharsets.UTF_8.name())) {
+
+            final Record record = reader.nextRecord();
+            final BigDecimal result = (BigDecimal)record.getValue("decimal");
+
+            assertEquals(new BigDecimal(value), result);
+        }
+    }
+
+    @Test
     public void testDateNoCoersionExpectedFormat() throws IOException, MalformedRecordException {
         final String text = "date\n11/30/1983";
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index bb3d0fe..d5569f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
@@ -126,6 +127,7 @@ public class TestWriteCSVResult {
             valueMap.put("long", 8L);
             valueMap.put("float", 8.0F);
             valueMap.put("double", 8.0D);
+            valueMap.put("decimal", BigDecimal.valueOf(8.1D));
             valueMap.put("date", new Date(now));
             valueMap.put("time", new Time(now));
             valueMap.put("timestamp", new Timestamp(now));
@@ -154,7 +156,7 @@ public class TestWriteCSVResult {
 
         final String values = splits[1];
         final StringBuilder expectedBuilder = new StringBuilder();
-        expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
+        expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
 
         final String expectedValues = expectedBuilder.toString();
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 0e50764..82b5b87 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -75,10 +75,10 @@ public class TestJsonSchemaInference {
         assertSame(RecordFieldType.LONG, schema.getDataType("setc").get().getFieldType());
         assertSame(RecordFieldType.LONG, schema.getDataType("boolc").get().getFieldType());
         assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("binaryc").get());
+        // We currently do not read BigDecimal from JSON as ObjectMapper in InferenceSchemaStrategy automatically reads it as double
 
         final List<String> fieldNames = schema.getFieldNames();
         assertEquals(Arrays.asList("varcharc", "uuid", "tinyintc", "textc", "datec", "smallintc", "mediumintc", "intc", "bigintc",
                 "floatc", "doublec", "decimalc", "timestampc", "timec", "charc", "tinytextc", "blobc", "mediumtextc", "enumc", "setc", "boolc", "binaryc"), fieldNames);
     }
-
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 759a5ff..bea213f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -38,6 +38,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,10 +60,14 @@ public class TestJsonTreeRowRecordReader {
     private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
 
     private List<RecordField> getDefaultFields() {
+        return getFields(RecordFieldType.DOUBLE.getDataType());
+    }
+
+    private List<RecordField> getFields(final DataType balanceDataType) {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("balance", balanceDataType));
         fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
@@ -249,7 +254,8 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testReadMultilineJSON() throws IOException, MalformedRecordException {
-        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
@@ -260,14 +266,14 @@ public class TestJsonTreeRowRecordReader {
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
             final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+                    RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
-            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
 
             final Object[] secondRecordValues = reader.nextRecord().getValues();
-            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
 
             assertNull(reader.nextRecord());
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index b941c09..e8b5cef 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -35,6 +35,7 @@ import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -94,6 +95,7 @@ public class TestWriteJsonResult {
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
+        valueMap.put("decimal", BigDecimal.valueOf(8.1D));
         valueMap.put("date", new Date(time));
         valueMap.put("time", new Time(time));
         valueMap.put("timestamp", new Timestamp(time));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
index caf0038..682abcd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
@@ -144,6 +144,26 @@ public class TestFieldTypeInference {
         runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
     }
 
+    @Test
+    public void testToDataTypeWhenDecimal() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.DECIMAL.getDecimalDataType(10, 1),
+                RecordFieldType.DECIMAL.getDecimalDataType(10, 3),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 3),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 5),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 7),
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.DOUBLE.getDataType()
+        );
+
+        DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(10, 7);
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
+    }
+
     private SimpleRecordSchema createRecordSchema(String fieldName, DataType fieldType) {
         return new SimpleRecordSchema(Arrays.asList(
                 new RecordField(fieldName, fieldType)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
index 6ab9229..0cdd217 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
@@ -189,6 +189,7 @@ public class TestWriteXMLResult {
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
+        valueMap.put("decimal", 8.1D);
         valueMap.put("date", new Date(time));
         valueMap.put("time", new Time(time));
         valueMap.put("timestamp", new Timestamp(time));
@@ -207,8 +208,8 @@ public class TestWriteXMLResult {
         writer.flush();
 
         String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><short>8</short>" +
-                "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><date>2017-01-01</date>" +
-                "<time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
+                "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><decimal>8.1</decimal>" +
+                "<date>2017-01-01</date><time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
                 "<map><height>48</height><width>96</width></map></RECORD></ROOT>";
 
         assertThat(xmlResult, CompareMatcher.isSimilarTo(out.toString()).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
index ef3d692..252b572 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
@@ -33,6 +33,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -401,6 +402,22 @@ public class TestXMLRecordReader {
     }
 
     @Test
+    public void testSimpleRecordWithAttribute6() throws IOException, MalformedRecordException {
+        // given
+        final InputStream is = new FileInputStream("src/test/resources/xml/people2.xml");
+        final List<RecordField> fields = getSimpleRecordFields();
+        fields.add(new RecordField("ID", RecordFieldType.DECIMAL.getDecimalDataType(38, 10)));
+        final XMLRecordReader reader = new XMLRecordReader(is, new SimpleRecordSchema(fields), true,
+                null, "CONTENT", dateFormat, timeFormat, timestampFormat, Mockito.mock(ComponentLog.class));
+
+        // when
+        final Record record = reader.nextRecord(true, false);
+
+        // then
+        assertEquals(BigDecimal.class, record.getValue("ID").getClass());
+    }
+
+    @Test
     public void testSimpleRecordWithAttributeCoerceFalseDropFalse() throws IOException, MalformedRecordException {
         InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
         List<RecordField> fields = getSimpleRecordFields();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
new file mode 100644
index 0000000..7529378
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
@@ -0,0 +1,36 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+        "name" : "decimal1", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+        "name" : "decimal2", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+        "name" : "decimal3", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+		"name" : "decimal4", "type": {
+			"type" : "bytes",
+			"logicalType" : "decimal",
+			"precision" : 10,
+			"scale" : 2
+        }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
index 0472512..e6de436 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
@@ -7,6 +7,7 @@
   "bigint" : 8,
   "float" : 8.0,
   "double" : 8.0,
+  "decimal" : 8.1,
   "timestamp" : "2017-01-01 17:00:00",
   "date" : "2017-01-01",
   "time" : "17:00:00",