You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/02 19:13:58 UTC

[nifi] branch master updated (91dd59d -> e0dd6d4)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 91dd59d  NIFI-7312: Enable search in variable registry of root process group
     new 5c2bfcf  NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records
     new e0dd6d4  NIFI-7369: Consider DECIMAL type as a numeric type when using a CHOICE type in QueryRecord

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi/serialization/record/RecordFieldType.java |  19 ++-
 .../serialization/record/ResultSetRecordSet.java   |  29 +++-
 .../{MapDataType.java => DecimalDataType.java}     |  39 ++---
 .../serialization/record/util/DataTypeUtils.java   |  94 +++++++++++-
 .../record/ResultSetRecordSetTest.java             | 165 +++++++++++++++++++++
 .../serialization/record/TestDataTypeUtils.java    | 136 +++++++++++++++++
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   3 +
 .../TestPutElasticsearchHttpRecord.java            |   4 +-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |  11 +-
 .../schema/access/InferenceSchemaStrategy.java     |   4 +
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     |  42 +++++-
 .../schema/access/InferenceSchemaStrategyTest.java | 143 ++++++++++++++++++
 .../schema/validation/StandardSchemaValidator.java |  10 +-
 .../validation/TestStandardSchemaValidator.java    |  13 +-
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |   6 +
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |   3 +
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |  15 ++
 .../org/apache/nifi/processors/orc/PutORCTest.java |  15 +-
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |  20 +++
 .../nifi/controller/kudu/KuduLookupService.java    |   6 +-
 .../processors/kudu/AbstractKuduProcessor.java     |  38 ++++-
 .../org/apache/nifi/processors/kudu/PutKudu.java   |   9 +-
 .../apache/nifi/processors/kudu/MockPutKudu.java   |  15 +-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  30 +++-
 .../reporting/prometheus/PrometheusRecordSink.java |   1 +
 .../prometheus/TestPrometheusRecordSink.java       |  16 +-
 .../nifi/rules/handlers/RecordSinkHandler.java     |   5 +
 .../nifi/rules/handlers/TestRecordSinkHandler.java |   4 +
 .../reporting/AbstractSiteToSiteReportingTask.java |   1 +
 .../org/apache/nifi/processors/solr/SolrUtils.java |   5 +-
 .../apache/nifi/processors/solr/SolrUtilsTest.java |  62 ++++++++
 .../org/apache/nifi/queryrecord/FlowFileTable.java |   4 +
 .../nifi-record-serialization-services/pom.xml     |   1 +
 .../apache/nifi/csv/AbstractCSVRecordReader.java   |   1 +
 .../org/apache/nifi/csv/CSVSchemaInference.java    |   5 +
 .../java/org/apache/nifi/csv/WriteCSVResult.java   |   1 +
 .../org/apache/nifi/json/JsonSchemaInference.java  |   8 +
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |   1 +
 .../java/org/apache/nifi/json/WriteJsonResult.java |   3 +
 .../java/org/apache/nifi/xml/WriteXMLResult.java   |   1 +
 .../java/org/apache/nifi/xml/XMLRecordReader.java  |   2 +
 .../nifi/xml/inference/XmlSchemaInference.java     |   5 +
 .../avro/TestAvroReaderWithEmbeddedSchema.java     |   4 +-
 .../org/apache/nifi/avro/TestWriteAvroResult.java  |  57 ++++++-
 .../org/apache/nifi/csv/TestCSVRecordReader.java   |  22 +++
 .../org/apache/nifi/csv/TestWriteCSVResult.java    |   4 +-
 .../apache/nifi/json/TestJsonSchemaInference.java  |   2 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java     |  16 +-
 .../org/apache/nifi/json/TestWriteJsonResult.java  |   2 +
 .../schema/inference/TestFieldTypeInference.java   |  20 +++
 .../org/apache/nifi/xml/TestWriteXMLResult.java    |   5 +-
 .../org/apache/nifi/xml/TestXMLRecordReader.java   |  17 +++
 .../src/test/resources/avro/decimals.avsc          |  36 +++++
 .../src/test/resources/json/output/dataTypes.json  |   1 +
 54 files changed, 1098 insertions(+), 83 deletions(-)
 copy nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/{MapDataType.java => DecimalDataType.java} (61%)
 create mode 100644 nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
 create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
 create mode 100644 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc


[nifi] 02/02: NIFI-7369: Consider DECIMAL type as a numeric type when using a CHOICE type in QueryRecord

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit e0dd6d466e07b3c41fc64bcc40f5e4abe7680c71
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Jun 2 15:12:54 2020 -0400

    NIFI-7369: Consider DECIMAL type as a numeric type when using a CHOICE type in QueryRecord
    
    This closes #4223.
---
 .../src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java         | 1 +
 1 file changed, 1 insertion(+)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 4f0fec2..20c2dbf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -279,6 +279,7 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
         switch (dataType.getFieldType()) {
             case BIGINT:
             case BYTE:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:


[nifi] 01/02: NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 5c2bfcf7d3b8fdafe70ee4920645bb3e8fa5e539
Author: Bence Simon <si...@gmail.com>
AuthorDate: Wed Apr 22 15:48:10 2020 +0200

    NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../nifi/serialization/record/RecordFieldType.java |  19 ++-
 .../serialization/record/ResultSetRecordSet.java   |  29 +++-
 .../serialization/record/type/DecimalDataType.java |  70 +++++++++
 .../serialization/record/util/DataTypeUtils.java   |  94 +++++++++++-
 .../record/ResultSetRecordSetTest.java             | 165 +++++++++++++++++++++
 .../serialization/record/TestDataTypeUtils.java    | 136 +++++++++++++++++
 .../elasticsearch/PutElasticsearchHttpRecord.java  |   3 +
 .../TestPutElasticsearchHttpRecord.java            |   4 +-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |  11 +-
 .../schema/access/InferenceSchemaStrategy.java     |   4 +
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     |  42 +++++-
 .../schema/access/InferenceSchemaStrategyTest.java | 143 ++++++++++++++++++
 .../schema/validation/StandardSchemaValidator.java |  10 +-
 .../validation/TestStandardSchemaValidator.java    |  13 +-
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |   6 +
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |   3 +
 .../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java |  15 ++
 .../org/apache/nifi/processors/orc/PutORCTest.java |  15 +-
 .../org/apache/nifi/util/orc/TestNiFiOrcUtils.java |  20 +++
 .../nifi/controller/kudu/KuduLookupService.java    |   6 +-
 .../processors/kudu/AbstractKuduProcessor.java     |  38 ++++-
 .../org/apache/nifi/processors/kudu/PutKudu.java   |   9 +-
 .../apache/nifi/processors/kudu/MockPutKudu.java   |  15 +-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  30 +++-
 .../reporting/prometheus/PrometheusRecordSink.java |   1 +
 .../prometheus/TestPrometheusRecordSink.java       |  16 +-
 .../nifi/rules/handlers/RecordSinkHandler.java     |   5 +
 .../nifi/rules/handlers/TestRecordSinkHandler.java |   4 +
 .../reporting/AbstractSiteToSiteReportingTask.java |   1 +
 .../org/apache/nifi/processors/solr/SolrUtils.java |   5 +-
 .../apache/nifi/processors/solr/SolrUtilsTest.java |  62 ++++++++
 .../org/apache/nifi/queryrecord/FlowFileTable.java |   3 +
 .../nifi-record-serialization-services/pom.xml     |   1 +
 .../apache/nifi/csv/AbstractCSVRecordReader.java   |   1 +
 .../org/apache/nifi/csv/CSVSchemaInference.java    |   5 +
 .../java/org/apache/nifi/csv/WriteCSVResult.java   |   1 +
 .../org/apache/nifi/json/JsonSchemaInference.java  |   8 +
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |   1 +
 .../java/org/apache/nifi/json/WriteJsonResult.java |   3 +
 .../java/org/apache/nifi/xml/WriteXMLResult.java   |   1 +
 .../java/org/apache/nifi/xml/XMLRecordReader.java  |   2 +
 .../nifi/xml/inference/XmlSchemaInference.java     |   5 +
 .../avro/TestAvroReaderWithEmbeddedSchema.java     |   4 +-
 .../org/apache/nifi/avro/TestWriteAvroResult.java  |  57 ++++++-
 .../org/apache/nifi/csv/TestCSVRecordReader.java   |  22 +++
 .../org/apache/nifi/csv/TestWriteCSVResult.java    |   4 +-
 .../apache/nifi/json/TestJsonSchemaInference.java  |   2 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java     |  16 +-
 .../org/apache/nifi/json/TestWriteJsonResult.java  |   2 +
 .../schema/inference/TestFieldTypeInference.java   |  20 +++
 .../org/apache/nifi/xml/TestWriteXMLResult.java    |   5 +-
 .../org/apache/nifi/xml/TestXMLRecordReader.java   |  17 +++
 .../src/test/resources/avro/decimals.avsc          |  36 +++++
 .../src/test/resources/json/output/dataTypes.json  |   1 +
 54 files changed, 1146 insertions(+), 65 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index 18b0781..4e18fb9 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
 
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 
@@ -73,6 +74,11 @@ public enum RecordFieldType {
     DOUBLE("double", FLOAT),
 
     /**
+     * A decimal field type. Fields of this type use a {@code java.math.BigDecimal} value.
+     */
+    DECIMAL("decimal", FLOAT, DOUBLE),
+
+    /**
      * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
      */
     TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
@@ -95,7 +101,7 @@ public enum RecordFieldType {
     /**
      * A String field type. Fields of this type use a {@code java.lang.String} value.
      */
-    STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP),
+    STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP),
 
     /**
      * <p>
@@ -349,6 +355,17 @@ public enum RecordFieldType {
     }
 
     /**
+     * Returns a Data Type that represents a decimal type with the given precision and scale.
+     *
+     * @param precision the precision of the decimal
+     * @param scale the scale of the decimal
+     * @return a DataType that represents a decimal with added information about it's precision and scale.
+     */
+    public DataType getDecimalDataType(final int precision, final int scale) {
+        return new DecimalDataType(precision, scale);
+    }
+
+    /**
      * Determines whether or this this RecordFieldType is "wider" than the provided type. A type "A" is said to be wider
      * than another type "B" iff A encompasses all values of B and more. For example, the LONG type is wider than INT, and INT
      * is wider than SHORT. "Complex" types (MAP, RECORD, ARRAY, CHOICE) are not wider than any other type, and no other type is
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 953b511..3b98657 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Array;
 import java.sql.ResultSet;
@@ -53,6 +54,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String DATE_CLASS_NAME = Date.class.getName();
     private static final String DOUBLE_CLASS_NAME = Double.class.getName();
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
+    private static final String BIGDECIMAL_CLASS_NAME = BigDecimal.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         this.rs = rs;
@@ -194,6 +196,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
                 return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                final BigDecimal bigDecimal = rs.getBigDecimal(columnIndex);
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
             case Types.OTHER: {
                 // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
                 if (rs.isAfterLast()) {
@@ -212,8 +218,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 final Object obj = rs.getObject(columnIndex);
                 if (!(obj instanceof Record)) {
                     final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
-                        RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME,
-                        RecordFieldType.TIMESTAMP)
+                        RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
+                            RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
                     .map(RecordFieldType::getDataType)
                     .collect(Collectors.toList());
 
@@ -234,7 +240,14 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                     }
                 }
 
-                return getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex)).getDataType();
+                final RecordFieldType fieldType = getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex));
+
+                if (RecordFieldType.DECIMAL.equals(fieldType)) {
+                    final BigDecimal bigDecimalValue = rs.getBigDecimal(columnIndex);
+                    return fieldType.getDecimalDataType(bigDecimalValue.precision(), bigDecimalValue.scale());
+                } else {
+                    return fieldType.getDataType();
+                }
             }
         }
     }
@@ -310,6 +323,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             if (valueToLookAt instanceof Double) {
                 return RecordFieldType.DOUBLE.getDataType();
             }
+            if (valueToLookAt instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+            }
             if (valueToLookAt instanceof Boolean) {
                 return RecordFieldType.BOOLEAN.getDataType();
             }
@@ -353,9 +370,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 return RecordFieldType.CHAR;
             case Types.DATE:
                 return RecordFieldType.DATE;
+            case Types.NUMERIC:
             case Types.DECIMAL:
+                return RecordFieldType.DECIMAL;
             case Types.DOUBLE:
-            case Types.NUMERIC:
             case Types.REAL:
                 return RecordFieldType.DOUBLE;
             case Types.FLOAT:
@@ -393,6 +411,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
                 if (DOUBLE_CLASS_NAME.equals(valueClassName)) {
                     return RecordFieldType.DOUBLE;
                 }
+                if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.DECIMAL;
+                }
 
                 return RecordFieldType.RECORD;
             case Types.TIME:
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
new file mode 100644
index 0000000..4020bcb
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class DecimalDataType extends DataType {
+    private final int precision;
+    private final int scale;
+
+    public DecimalDataType(final int precision, final int scale) {
+        super(RecordFieldType.DECIMAL, null);
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 31;
+        hash = 41 * hash + getFieldType().hashCode();
+        hash = 41 * hash + precision;
+        hash = 41 * hash + scale;
+        return hash;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof DecimalDataType)) {
+            return false;
+        }
+
+        final DecimalDataType other = (DecimalDataType) obj;
+        return getPrecision() == other.getPrecision() && getScale() == other.getScale();
+    }
+
+    @Override
+    public String toString() {
+        return getFieldType().toString() + "[" + precision + "," + scale + "]";
+    }
+}
+
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f644fda..e1ee122 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.io.InputStream;
 import java.io.Reader;
 import java.lang.reflect.Array;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -93,7 +95,14 @@ public class DataTypeUtils {
             "(" + Base10Decimal + OptionalBase10Exponent + ")" +
         ")";
 
+    private static final String decimalRegex =
+        OptionalSign +
+            "(" + Base10Digits + OptionalBase10Decimal + ")" + "|" +
+            "(" + Base10Digits + OptionalBase10Decimal + Base10Exponent + ")" + "|" +
+            "(" + Base10Decimal + OptionalBase10Exponent + ")";
+
     private static final Pattern FLOATING_POINT_PATTERN = Pattern.compile(doubleRegex);
+    private static final Pattern DECIMAL_PATTERN = Pattern.compile(decimalRegex);
 
     private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
 
@@ -127,6 +136,7 @@ public class DataTypeUtils {
         NUMERIC_VALIDATORS.put(RecordFieldType.SHORT, value -> value instanceof Short);
         NUMERIC_VALIDATORS.put(RecordFieldType.DOUBLE, value -> value instanceof Double);
         NUMERIC_VALIDATORS.put(RecordFieldType.FLOAT, value -> value instanceof Float);
+        NUMERIC_VALIDATORS.put(RecordFieldType.DECIMAL, value -> value instanceof BigDecimal);
     }
 
     public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
@@ -174,6 +184,8 @@ public class DataTypeUtils {
                 return toCharacter(value, fieldName);
             case DATE:
                 return toDate(value, dateFormat, fieldName);
+            case DECIMAL:
+                return toBigDecimal(value, fieldName);
             case DOUBLE:
                 return toDouble(value, fieldName);
             case FLOAT:
@@ -228,6 +240,8 @@ public class DataTypeUtils {
                 return isCharacterTypeCompatible(value);
             case DATE:
                 return isDateTypeCompatible(value, dataType.getFormat());
+            case DECIMAL:
+                return isDecimalTypeCompatible(value);
             case DOUBLE:
                 return isDoubleTypeCompatible(value);
             case FLOAT:
@@ -483,6 +497,10 @@ public class DataTypeUtils {
             if (value instanceof BigInteger) {
                 return RecordFieldType.BIGINT.getDataType();
             }
+            if (value instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) value;
+                return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+            }
         }
 
         if (value instanceof Boolean) {
@@ -1232,6 +1250,10 @@ public class DataTypeUtils {
         return isNumberTypeCompatible(value, DataTypeUtils::isIntegral);
     }
 
+    public static boolean isDecimalTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, DataTypeUtils::isDecimal);
+    }
+
     public static Boolean toBoolean(final Object value, final String fieldName) {
         if (value == null) {
             return null;
@@ -1266,6 +1288,50 @@ public class DataTypeUtils {
         return false;
     }
 
+    public static BigDecimal toBigDecimal(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigDecimal) {
+            return (BigDecimal) value;
+        }
+
+        if (value instanceof Number) {
+            final Number number = (Number) value;
+
+            if (number instanceof Byte
+                    || number instanceof Short
+                    || number instanceof Integer
+                    || number instanceof Long) {
+                return BigDecimal.valueOf(number.longValue());
+            }
+
+            if (number instanceof BigInteger) {
+                return new BigDecimal((BigInteger) number);
+            }
+
+            if (number instanceof Float) {
+                return new BigDecimal(Float.toString((Float) number));
+            }
+
+            if (number instanceof Double) {
+                return new BigDecimal(Double.toString((Double) number));
+            }
+        }
+
+        if (value instanceof String) {
+            try {
+                return new BigDecimal((String) value);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName
+                        + ", value is not a valid representation of BigDecimal", nfe);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName);
+    }
+
     public static Double toDouble(final Object value, final String fieldName) {
         if (value == null) {
             return null;
@@ -1322,6 +1388,14 @@ public class DataTypeUtils {
         return isNumberTypeCompatible(value, s -> isFloatingPoint(s));
     }
 
+    private static boolean isDecimal(final String value) {
+        if (value == null || value.isEmpty()) {
+            return false;
+        }
+
+        return DECIMAL_PATTERN.matcher(value).matches();
+    }
+
     private static boolean isFloatingPoint(final String value) {
         if (value == null || value.isEmpty()) {
             return false;
@@ -1691,15 +1765,31 @@ public class DataTypeUtils {
             case FLOAT:
                 if (otherFieldType == RecordFieldType.DOUBLE) {
                     return Optional.of(otherDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    return Optional.of(otherDataType);
                 }
                 break;
             case DOUBLE:
                 if (otherFieldType == RecordFieldType.FLOAT) {
                     return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    return Optional.of(otherDataType);
                 }
                 break;
+            case DECIMAL:
+                if (otherFieldType == RecordFieldType.DOUBLE) {
+                    return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.FLOAT) {
+                    return Optional.of(thisDataType);
+                } else if (otherFieldType == RecordFieldType.DECIMAL) {
+                    final DecimalDataType thisDecimalDataType = (DecimalDataType) thisDataType;
+                    final DecimalDataType otherDecimalDataType = (DecimalDataType) otherDataType;
 
-
+                    final int precision = Math.max(thisDecimalDataType.getPrecision(), otherDecimalDataType.getPrecision());
+                    final int scale = Math.max(thisDecimalDataType.getScale(), otherDecimalDataType.getScale());
+                    return Optional.of(RecordFieldType.DECIMAL.getDecimalDataType(precision, scale));
+                }
+                break;
             case CHAR:
                 if (otherFieldType == RecordFieldType.STRING) {
                     return Optional.of(otherDataType);
@@ -1759,6 +1849,8 @@ public class DataTypeUtils {
                 return Types.DOUBLE;
             case FLOAT:
                 return Types.FLOAT;
+            case DECIMAL:
+                return Types.NUMERIC;
             case INT:
                 return Types.INTEGER;
             case SHORT:
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
new file mode 100644
index 0000000..5c9a39c
--- /dev/null
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ResultSetRecordSetTest {
+    private static final Object[][] COLUMNS = new Object[][] {
+            // column number; column label / name / schema field; column type; schema data type;
+            {1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()},
+            {2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()},
+            {3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()},
+            {4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
+            {5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
+            {6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()},
+            {7, "date", Types.DATE, RecordFieldType.DATE.getDataType()},
+            {8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()},
+            {9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
+            {10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()},
+            {11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
+            {12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
+            {13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()},
+            {14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
+            {15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
+            {16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
+    };
+
+    @Mock
+    private ResultSet resultSet;
+
+    @Mock
+    private ResultSetMetaData resultSetMetaData;
+
+    @Before
+    public void setUp() throws SQLException {
+        Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
+
+        for (final Object[] column : COLUMNS) {
+            Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col");
+            Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
+            Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
+        }
+
+        // Big decimal values are necessary in order to determine precision and scale
+        Mockito.when(resultSet.getBigDecimal(14)).thenReturn(BigDecimal.valueOf(1234.567D));
+        Mockito.when(resultSet.getBigDecimal(15)).thenReturn(BigDecimal.valueOf(1234L));
+        Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
+
+        // This will be handled by a dedicated branch for Java Objects, needs some further details
+        Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
+    }
+
+    @Test
+    public void testCreateSchema() throws SQLException {
+        // given
+        final RecordSchema recordSchema = givenRecordSchema();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllColumnDataTypesAreCorrect(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenNoRecordSchema() throws SQLException {
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllColumnDataTypesAreCorrect(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherType() throws SQLException {
+        // given
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForOther();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeWithoutSchema() throws SQLException {
+        // given
+        final ResultSet resultSet = givenResultSetForOther();
+
+        // when
+        final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
+    }
+
+    private ResultSet givenResultSetForOther() throws SQLException {
+        final ResultSet resultSet = Mockito.mock(ResultSet.class);
+        final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+        Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1);
+        Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+        Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column");
+        Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
+        return resultSet;
+    }
+
+    private RecordSchema givenRecordSchema() {
+        final List<RecordField> fields = new ArrayList<>();
+
+        for (final Object[] column : COLUMNS) {
+            fields.add(new RecordField((String) column[1], (DataType) column[3]));
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
+        Assert.assertNotNull(resultSchema);
+
+        for (final Object[] column : COLUMNS) {
+            Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 6d35789..d82be97 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -24,9 +24,11 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.DoubleAdder;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -266,6 +269,113 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testConvertToBigDecimalWhenInputIsValid() {
+        // given
+        final BigDecimal expectedValue = BigDecimal.valueOf(12L);
+
+        // when & then
+        whenExpectingValidBigDecimalConversion(expectedValue, BigDecimal.valueOf(12L));
+        whenExpectingValidBigDecimalConversion(expectedValue, (byte) 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, (short) 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12L);
+        whenExpectingValidBigDecimalConversion(expectedValue, BigInteger.valueOf(12L));
+        whenExpectingValidBigDecimalConversion(expectedValue, 12F);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12.000F);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12D);
+        whenExpectingValidBigDecimalConversion(expectedValue, 12.000D);
+        whenExpectingValidBigDecimalConversion(expectedValue, "12");
+        whenExpectingValidBigDecimalConversion(expectedValue, "12.000");
+    }
+
+    @Test
+    public void testConvertToBigDecimalWhenNullInput() {
+        assertNull(DataTypeUtils.convertType(null, RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8));
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenInputStringIsInvalid() {
+        DataTypeUtils.convertType("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenUnsupportedType() {
+        DataTypeUtils.convertType(new ArrayList<Double>(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test(expected = IllegalTypeConversionException.class)
+    public void testConvertToBigDecimalWhenUnsupportedNumberType() {
+        DataTypeUtils.convertType(new DoubleAdder(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testCompatibleDataTypeBigDecimal() {
+        // given
+        final DataType dataType = RecordFieldType.DECIMAL.getDecimalDataType(30, 10);
+
+        // when & then
+        assertTrue(DataTypeUtils.isCompatibleDataType(new BigDecimal("1.2345678901234567890"), dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(new BigInteger("12345678901234567890"), dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(1234567890123456789L, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType((byte) 1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType((short) 1, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType("1.2345678901234567890", dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(3.1F, dataType));
+        assertTrue(DataTypeUtils.isCompatibleDataType(3.0D, dataType));
+        assertFalse(DataTypeUtils.isCompatibleDataType("1234567XYZ", dataType));
+        assertFalse(DataTypeUtils.isCompatibleDataType(new Long[]{1L, 2L}, dataType));
+    }
+
+    @Test
+    public void testInferDataTypeWithBigDecimal() {
+        assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(3, 1), DataTypeUtils.inferDataType(BigDecimal.valueOf(12.3D), null));
+    }
+
+    @Test
+    public void testIsBigDecimalTypeCompatible() {
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible((byte) 13));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible((short) 13));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12L));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigInteger.valueOf(12L)));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123F));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123D));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigDecimal.valueOf(12.123D)));
+        assertTrue(DataTypeUtils.isDecimalTypeCompatible("123"));
+
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible(null));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("test"));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible(new ArrayList<>()));
+        // Decimal handling does not support NaN and Infinity as the underlying BigDecimal is unable to parse
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("NaN"));
+        assertFalse(DataTypeUtils.isDecimalTypeCompatible("Infinity"));
+    }
+
+    @Test
+    public void testGetSQLTypeValueWithBigDecimal() {
+        assertEquals(Types.NUMERIC, DataTypeUtils.getSQLTypeValue(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+    }
+
+    @Test
+    public void testChooseDataTypeWhenExpectedIsBigDecimal() {
+        // GIVEN
+        final List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.DOUBLE.getDataType(),
+                RecordFieldType.DECIMAL.getDecimalDataType(2, 1),
+                RecordFieldType.DECIMAL.getDecimalDataType(20, 10)
+        );
+
+        final Object value = new BigDecimal("1.2");
+        final DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(2, 1);
+
+        // WHEN
+        // THEN
+        testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+    }
+
+    @Test
     public void testFloatingPointCompatibility() {
         final String[] prefixes = new String[] {"", "-", "+"};
         final String[] exponents = new String[] {"e0", "e1", "e-1", "E0", "E1", "E-1"};
@@ -504,6 +614,11 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testFindMostSuitableTypeWithBigDecimal() {
+        testFindMostSuitableType(BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3));
+    }
+
+    @Test
     public void testFindMostSuitableTypeWithFloat() {
         testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
     }
@@ -580,6 +695,27 @@ public class TestDataTypeUtils {
         });
     }
 
+    private void whenExpectingValidBigDecimalConversion(final BigDecimal expectedValue, final Object incomingValue) {
+        // Checking indirect conversion
+        final String failureMessage = "Conversion from " + incomingValue.getClass().getSimpleName() + " to " + expectedValue.getClass().getSimpleName() + " failed, when ";
+        final BigDecimal indirectResult = whenExpectingValidConversion(expectedValue, incomingValue, RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        // In some cases, direct equality check comes with false negative as the changing representation brings in
+        // insignificant changes what might break the comparison. For example 12F will be represented as "12.0"
+        assertEquals(failureMessage + "indirect", 0, expectedValue.compareTo(indirectResult));
+
+        // Checking direct conversion
+        final BigDecimal directResult = DataTypeUtils.toBigDecimal(incomingValue, "field");
+        assertEquals(failureMessage + "direct", 0, expectedValue.compareTo(directResult));
+
+    }
+
+    private <T> T whenExpectingValidConversion(final T expectedValue, final Object incomingValue, final DataType dataType) {
+        final Object result = DataTypeUtils.convertType(incomingValue, dataType, null, StandardCharsets.UTF_8);
+        assertNotNull(result);
+        assertTrue(expectedValue.getClass().isInstance(result));
+        return (T) result;
+    }
+
     @Test
     public void testIsIntegerFitsToFloat() {
         final int maxRepresentableInt = Double.valueOf(Math.pow(2, 24)).intValue();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index f1c45a0..2de4281 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -694,6 +694,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
                     generator.writeNumber((BigInteger) coercedValue);
                 }
                 break;
+            case DECIMAL:
+                generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+                break;
             case BOOLEAN:
                 final String stringValue = coercedValue.toString();
                 if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index afe2055..fa133b4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -41,6 +41,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.ConnectException;
 import java.sql.Date;
 import java.sql.Time;
@@ -664,9 +665,10 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("date", RecordFieldType.DATE);
         parser.addSchemaField("time", RecordFieldType.TIME);
         parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
+        parser.addSchemaField("amount", RecordFieldType.DECIMAL);
 
         for(int i=1; i<=numRecords; i++) {
-            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+            parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 950e4cb..f54d329 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -256,6 +257,11 @@ public class AvroTypeUtil {
             case LONG:
                 schema = Schema.create(Type.LONG);
                 break;
+            case DECIMAL:
+                final DecimalDataType decimalDataType = (DecimalDataType) dataType;
+                schema = Schema.create(Type.BYTES);
+                LogicalTypes.decimal(decimalDataType.getPrecision(), decimalDataType.getScale()).addToSchema(schema);
+                break;
             case MAP:
                 schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, false));
                 break;
@@ -341,9 +347,8 @@ public class AvroTypeUtil {
                 case LOGICAL_TYPE_TIMESTAMP_MICROS:
                     return RecordFieldType.TIMESTAMP.getDataType();
                 case LOGICAL_TYPE_DECIMAL:
-                    // We convert Decimal to Double.
-                    // Alternatively we could convert it to String, but numeric type is generally more preferable by users.
-                    return RecordFieldType.DOUBLE.getDataType();
+                    final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+                    return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale());
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
index 1b80886..1c12926 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.EnumSet;
@@ -60,6 +61,9 @@ public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy {
                 field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType());
             } else if (entry.getValue() instanceof Date) {
                 field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType());
+            } else if (entry.getValue() instanceof BigDecimal) {
+                final BigDecimal bigDecimal = (BigDecimal) entry.getValue();
+                field = new RecordField(entry.getKey(), RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale()));
             } else if (entry.getValue() instanceof List) {
                 field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType());
             } else if (entry.getValue() instanceof Map) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 412d573..6cc6f4f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -78,6 +78,7 @@ public class TestAvroTypeUtil {
         fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
         fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
         fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
         fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
         fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
         fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
@@ -117,6 +118,7 @@ public class TestAvroTypeUtil {
         assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
         assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
         assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
+        assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), afterConversion.getDataType("decimal").get());
         assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
         assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
         assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
@@ -411,7 +413,7 @@ public class TestAvroTypeUtil {
     public void testConvertAvroRecordToMapWithFieldTypeOfFixedAndLogicalTypeDecimal() {
        // Create a field schema like {"type":"fixed","name":"amount","size":16,"logicalType":"decimal","precision":18,"scale":8}
        final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
-        final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);;
+        final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);
         decimalType.addToSchema(fieldSchema);
 
         // Create a field named "amount" using the field schema above
@@ -421,24 +423,50 @@ public class TestAvroTypeUtil {
         final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
 
         // Create an example Avro record with the amount field of type fixed and a logical type of decimal
-        final BigDecimal expectedDecimalValue = new BigDecimal("1234567890.12345678");
+        final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
         final GenericRecord genericRecord = new GenericData.Record(avroSchema);
-        genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedDecimalValue, fieldSchema, decimalType));
+        genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedBigDecimal, fieldSchema, decimalType));
 
         // Convert the Avro schema to a Record schema
+        thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+    }
+
+    @Test
+    public void testConvertAvroRecordToMapWithFieldTypeOfBinaryAndLogicalTypeDecimal() {
+        // Create a field schema like {"type":"binary","name":"amount","logicalType":"decimal","precision":18,"scale":8}
+        final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
+        final Schema fieldSchema = Schema.create(Type.BYTES);
+        decimalType.addToSchema(fieldSchema);
+
+        // Create a field named "amount" using the field schema above
+        final Schema.Field field = new Schema.Field("amount", fieldSchema, null, (Object)null);
+
+        // Create an overall record schema with the amount field
+        final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
+
+        // Create an example Avro record with the amount field of type binary and a logical type of decimal
+        final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
+        final GenericRecord genericRecord = new GenericData.Record(avroSchema);
+        genericRecord.put("amount", new Conversions.DecimalConversion().toBytes(expectedBigDecimal, fieldSchema, decimalType));
+
+        // Convert the Avro schema to a Record schema
+        thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+    }
+
+    private void thenConvertAvroSchemaToRecordSchema(Schema avroSchema, BigDecimal expectedBigDecimal, GenericRecord genericRecord) {
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
 
         // Convert the Avro record a Map and verify the object produced is the same BigDecimal that was converted to fixed
-        final Map<String,Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
+        final Map<String, Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
         assertNotNull(convertedMap);
         assertEquals(1, convertedMap.size());
 
         final Object resultObject = convertedMap.get("amount");
         assertNotNull(resultObject);
-        assertTrue(resultObject instanceof Double);
+        assertTrue(resultObject instanceof BigDecimal);
 
-        final Double resultDouble = (Double) resultObject;
-        assertEquals(Double.valueOf(expectedDecimalValue.doubleValue()), resultDouble);
+        final BigDecimal resultBigDecimal = (BigDecimal) resultObject;
+        assertEquals(expectedBigDecimal, resultBigDecimal);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
new file mode 100644
index 0000000..e2d82a9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class InferenceSchemaStrategyTest {
+
+    private static final Object[][] CONTENT_FIELDS = new Object[][] {
+            {"integer", 1, RecordFieldType.INT.getDataType()},
+            {"long", 1L, RecordFieldType.LONG.getDataType()},
+            {"boolean", true, RecordFieldType.BOOLEAN.getDataType()},
+            {"double", 1D, RecordFieldType.DOUBLE.getDataType()},
+            {"date", new Date(), RecordFieldType.DATE.getDataType()},
+            {"decimal", BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3)},
+            {"array", new ArrayList<String>(), RecordFieldType.ARRAY.getDataType()},
+
+            // date subclasses
+            {"time", new Time(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+            {"timestamp", new Timestamp(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+
+            // others are considered as string
+            {"byte", (byte) 1, RecordFieldType.STRING.getDataType()},
+            {"short", (short) 1, RecordFieldType.STRING.getDataType()},
+            {"bigint", BigInteger.ONE, RecordFieldType.STRING.getDataType()},
+            {"float", (float) 1, RecordFieldType.STRING.getDataType()},
+            {"char", (char) 1, RecordFieldType.STRING.getDataType()},
+    };
+
+    private final InferenceSchemaStrategy testSubject = new InferenceSchemaStrategy();
+
+    @Test
+    public void testSchemaConversion() throws Exception {
+        // when
+        final RecordSchema result = testSubject.getSchema(null, givenContent(), null);
+
+        // then
+        Assert.assertNotNull(result);
+        thenFieldsAreConvertedProperly(result, true);
+    }
+
+    @Test
+    public void testSchemaConversionWhenMap() throws Exception {
+        // given
+        final Map<String, Object> input = new HashMap<>();
+        final Map<String, Integer> field = new HashMap<>();
+        field.put("a1", 1);
+        field.put("a2", 2);
+        input.put("f1", field);
+
+        // when
+        final RecordSchema result = testSubject.getSchema(null, input, null);
+
+        // then
+        Assert.assertNotNull(result);
+        Assert.assertTrue(RecordDataType.class.isInstance(result.getField("f1").get().getDataType()));
+        final RecordDataType recordDataType = (RecordDataType) result.getField("f1").get().getDataType();
+
+        final RecordSchema childSchema = recordDataType.getChildSchema();
+        Assert.assertNotNull(childSchema);
+        Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a1").get().getDataType());
+        Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a2").get().getDataType());
+    }
+
+    @Test
+    public void testSchemaConversionFromJsonString() throws Exception {
+        // given
+        final String json = "{\"double\":1.0,\"integer\":1,\"long\":9223372036854775,\"boolean\":true,\"array\":[]}";
+
+        // when
+        final RecordSchema result = testSubject.getSchema(  null, new ByteArrayInputStream(json.getBytes()), null);
+
+        // then
+        Assert.assertNotNull(result);
+        thenFieldsAreConvertedProperly(result, false);
+    }
+
+    private Map<String, Object> givenContent() {
+        final HashMap<String, Object> result = new HashMap<>();
+
+        for (final Object[] contentField : CONTENT_FIELDS) {
+            result.put((String) contentField[0], contentField[1]);
+        }
+
+        return result;
+    }
+
+    private Map<String, DataType> givenExpected() {
+        final HashMap<String, DataType> result = new HashMap<>();
+
+        for (final Object[] contentField : CONTENT_FIELDS) {
+            result.put((String) contentField[0], (DataType) contentField[2]);
+        }
+
+        return result;
+    }
+
+    private void thenFieldsAreConvertedProperly(final RecordSchema result, final boolean mustPresent) {
+        final List<RecordField> fields = result.getFields();
+
+        for (final Map.Entry<String, DataType> expected : givenExpected().entrySet()) {
+            final Optional<RecordField> field = fields.stream().filter(f -> f.getFieldName().equals(expected.getKey())).findFirst();
+
+            if (field.isPresent()) {
+                Assert.assertEquals("\"" + expected.getKey() + "\" is expected to be converted " + expected.getValue().toString(), expected.getValue(), field.get().getDataType());
+            } else if (mustPresent) {
+                Assert.fail();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
index 76e7b7d..18b7c7a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
@@ -32,11 +32,10 @@ import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
 import org.apache.nifi.serialization.record.validation.ValidationError;
 import org.apache.nifi.serialization.record.validation.ValidationErrorType;
 
+import java.math.BigInteger;
 import java.util.Map;
 
 public class StandardSchemaValidator implements RecordSchemaValidator {
-
-
     private final SchemaValidationContext validationContext;
 
     public StandardSchemaValidator(final SchemaValidationContext validationContext) {
@@ -273,6 +272,13 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
                         || DataTypeUtils.isIntegerFitsToFloat(value)
                         || DataTypeUtils.isLongFitsToFloat(value)
                         || DataTypeUtils.isBigIntFitsToFloat(value);
+            case DECIMAL:
+                return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType())
+                        || value instanceof Byte
+                        || value instanceof Short
+                        || value instanceof Integer
+                        || value instanceof Long
+                        || value instanceof BigInteger;
         }
 
         return false;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
index dabfa9f..3f4e102 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
@@ -67,7 +68,8 @@ public class TestStandardSchemaValidator {
             RecordFieldType.LONG,
             RecordFieldType.BIGINT,
             RecordFieldType.FLOAT,
-            RecordFieldType.DOUBLE
+            RecordFieldType.DOUBLE,
+            RecordFieldType.DECIMAL
     ));
 
     @Test
@@ -117,6 +119,7 @@ public class TestStandardSchemaValidator {
         valueMap.put("short", (short) 8);
         valueMap.put("int", 9);
         valueMap.put("bigint", BigInteger.valueOf(8L));
+        valueMap.put("decimal", BigDecimal.valueOf(8.1D));
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
@@ -145,6 +148,9 @@ public class TestStandardSchemaValidator {
 
         valueMap.put("float_as_double", 8.0F);
 
+        valueMap.put("float_as_decimal", 8.0F);
+        valueMap.put("double_as_decimal", 8.0D);
+
         final Record record = new MapRecord(schema, valueMap);
 
         final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
@@ -170,18 +176,21 @@ public class TestStandardSchemaValidator {
     public void testByteIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DECIMAL);
     }
 
     @Test
     public void testShortIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DECIMAL);
     }
 
     @Test
     public void testIntegerWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT.intValue(), RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DECIMAL);
     }
 
     @Test
@@ -194,6 +203,7 @@ public class TestStandardSchemaValidator {
     public void testLongWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT, RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_DOUBLE, RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Long.MAX_VALUE, RecordFieldType.DECIMAL);
     }
 
     @Test
@@ -206,6 +216,7 @@ public class TestStandardSchemaValidator {
     public void testBigintWithinRangeIsConsideredToBeValidFloatingPoint() {
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.FLOAT);
         whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.DOUBLE);
+        whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(Long.MAX_VALUE), RecordFieldType.DECIMAL);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index ce06f82..b782534 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -22,8 +22,10 @@ import org.apache.avro.util.Utf8;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
@@ -44,6 +46,7 @@ import org.apache.hadoop.io.Text;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -104,6 +107,9 @@ public class NiFiOrcUtils {
             if (o instanceof Double) {
                 return new DoubleWritable((double) o);
             }
+            if (o instanceof BigDecimal) {
+                return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+            }
             if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
                 return new Text(o.toString());
             }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index cd7847f..74cdd2a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -22,6 +22,7 @@ import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -33,6 +34,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -199,6 +201,7 @@ public class TestNiFiOrcUtils {
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L) instanceof LongWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f) instanceof FloatWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0) instanceof DoubleWritable);
+        assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1.0D)) instanceof HiveDecimalWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}) instanceof List);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3)) instanceof List);
         Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index f726010..8af55d5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hive.ql.io.orc;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -45,6 +47,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.orc.MemoryManager;
@@ -52,6 +55,7 @@ import org.apache.orc.OrcConf;
 import org.apache.orc.impl.MemoryManagerImpl;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -104,6 +108,9 @@ public class NiFiOrcUtils {
             if (o instanceof Double) {
                 return new DoubleWritable((double) o);
             }
+            if (o instanceof BigDecimal) {
+                return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+            }
             if (o instanceof String) {
                 return new Text(o.toString());
             }
@@ -286,6 +293,11 @@ public class NiFiOrcUtils {
                 || RecordFieldType.STRING.equals(fieldType)) {
             return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType);
         }
+
+        if (RecordFieldType.DECIMAL.equals(fieldType)) {
+            DecimalDataType decimalDataType = (DecimalDataType) dataType;
+            return TypeInfoFactory.getDecimalTypeInfo(decimalDataType.getPrecision(), decimalDataType.getScale());
+        }
         if (RecordFieldType.DATE.equals(fieldType)) {
             return TypeInfoFactory.dateTypeInfo;
         }
@@ -407,6 +419,9 @@ public class NiFiOrcUtils {
         if (RecordFieldType.FLOAT.equals(dataType)) {
             return "FLOAT";
         }
+        if (RecordFieldType.DECIMAL.equals(dataType)) {
+            return "DECIMAL";
+        }
         if (RecordFieldType.STRING.equals(dataType)) {
             return "STRING";
         }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index a6d0214..b8ed6c2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -66,6 +67,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
@@ -75,7 +77,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoField;
-import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -126,7 +127,7 @@ public class PutORCTest {
 
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
         for (final RecordField recordField : recordSchema.getFields()) {
-            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+            readerFactory.addSchemaField(recordField);
         }
 
         if (recordGenerator == null) {
@@ -222,16 +223,14 @@ public class PutORCTest {
     public void testWriteORCWithAvroLogicalTypes() throws IOException, InitializationException {
         final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_logical_types.avsc"), StandardCharsets.UTF_8);
         schema = new Schema.Parser().parse(avroSchema);
-        Calendar now = Calendar.getInstance();
         LocalTime nowTime = LocalTime.now();
         LocalDateTime nowDateTime = LocalDateTime.now();
-        LocalDate epoch = LocalDate.ofEpochDay(0);
         LocalDate nowDate = LocalDate.now();
 
         final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY);
         final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime);
         final Date dt = Date.valueOf(nowDate);
-        final double dec = 1234.56;
+        final BigDecimal bigDecimal = new BigDecimal("92.12");
 
         configure(proc, 10, (numUsers, readerFactory) -> {
             for (int i = 0; i < numUsers; i++) {
@@ -240,7 +239,7 @@ public class PutORCTest {
                         timeMillis,
                         timestampMillis,
                         dt,
-                        dec);
+                        bigDecimal);
             }
             return null;
         });
@@ -265,7 +264,7 @@ public class PutORCTest {
         mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
         // DDL will be created with field names normalized (lowercased, e.g.) for Hive by default
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
-                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DOUBLE) STORED AS ORC");
+                "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC");
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
@@ -285,7 +284,7 @@ public class PutORCTest {
                     final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                     noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
                     assertEquals(noTimeOfDayDateFormat.format(dt), ((DateWritableV2) x.get(3)).get().toString());
-                    assertEquals(dec, ((DoubleWritable) x.get(4)).get(), Double.MIN_VALUE);
+                    assertEquals(bigDecimal, ((HiveDecimalWritable) x.get(4)).getHiveDecimal().bigDecimalValue());
                     return null;
                 }
         );
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index bde6af8..19580ea 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -21,7 +21,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -31,13 +33,16 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -170,6 +175,20 @@ public class TestNiFiOrcUtils {
         assertEquals(TypeInfoCreator.createString(), orcType);
     }
 
+
+    @Test
+    public void test_getOrcField_decimal() {
+        // given
+        final DecimalTypeInfo expected = TypeInfoFactory.getDecimalTypeInfo(4, 2);
+        final DataType decimalDataType = RecordFieldType.DECIMAL.getDecimalDataType(4, 2);
+
+        // when
+        final TypeInfo orcField = NiFiOrcUtils.getOrcField(decimalDataType, false);
+
+        // then
+        Assert.assertEquals(expected, orcField);
+    }
+
     @Test
     public void test_getPrimitiveOrcTypeFromPrimitiveFieldType() {
         // Expected ORC types
@@ -205,6 +224,7 @@ public class TestNiFiOrcUtils {
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L, true) instanceof LongWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f, true) instanceof FloatWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0, true) instanceof DoubleWritable);
+        assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1L), true) instanceof HiveDecimalWritable);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}, true) instanceof List);
         assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3), true) instanceof List);
         Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index b044f9a..bc59b12 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.AsyncKuduClient;
@@ -311,12 +312,15 @@ public class KuduLookupService extends AbstractControllerService implements Reco
                 case INT64:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
                     break;
+                case DECIMAL:
+                    final ColumnTypeAttributes attributes = cs.getTypeAttributes();
+                    fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())));
+                    break;
                 case UNIXTIME_MICROS:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
                     break;
                 case BINARY:
                 case STRING:
-                case DECIMAL:
                     fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
                     break;
                 case DOUBLE:
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index c41c8d5..b9639e5 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -18,8 +18,10 @@
 package org.apache.nifi.processors.kudu;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.AsyncKuduClient;
 import org.apache.kudu.client.Delete;
 import org.apache.kudu.client.Insert;
@@ -53,6 +55,8 @@ import org.apache.nifi.security.krb.KerberosPasswordUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StringUtils;
 
@@ -369,7 +373,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
     /**
      * Converts a NiFi DataType to it's equivalent Kudu Type.
      */
-    protected Type toKuduType(DataType nifiType) {
+    private Type toKuduType(DataType nifiType) {
         switch (nifiType.getFieldType()) {
             case BOOLEAN:
                 return Type.BOOL;
@@ -385,6 +389,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                 return Type.FLOAT;
             case DOUBLE:
                 return Type.DOUBLE;
+            case DECIMAL:
+                return Type.DECIMAL;
             case TIMESTAMP:
                 return Type.UNIXTIME_MICROS;
             case CHAR:
@@ -395,6 +401,36 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
         }
     }
 
+    private ColumnTypeAttributes getKuduTypeAttributes(final DataType nifiType) {
+        if (nifiType.getFieldType().equals(RecordFieldType.DECIMAL)) {
+            final DecimalDataType decimalDataType = (DecimalDataType) nifiType;
+            return new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(decimalDataType.getPrecision()).scale(decimalDataType.getScale()).build();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Based on NiFi field declaration, generates an alter statement to extend table with new column. Note: simply calling
+     * {@link AlterTableOptions#addNullableColumn(String, Type)} is not sufficient as it does not cover BigDecimal scale and precision handling.
+     *
+     * @param columnName Name of the new table column.
+     * @param nifiType Type of the field.
+     *
+     * @return Alter table statement to extend table with the new field.
+     */
+    protected AlterTableOptions getAddNullableColumnStatement(final String columnName, final DataType nifiType) {
+        final AlterTableOptions alterTable = new AlterTableOptions();
+
+        alterTable.addColumn(new ColumnSchema.ColumnSchemaBuilder(columnName, toKuduType(nifiType))
+                .nullable(true)
+                .defaultValue(null)
+                .typeAttributes(getKuduTypeAttributes(nifiType))
+                .build());
+
+        return alterTable;
+    }
+
     private int getColumnIndex(Schema columns, String colName) {
         try {
             return columns.getColumnIndex(colName);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 2ac0195..c0d7e46 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -18,11 +18,10 @@
 package org.apache.nifi.processors.kudu;
 
 import org.apache.kudu.Schema;
-import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.RowError;
@@ -325,10 +324,8 @@ public class PutKudu extends AbstractKuduProcessor {
                         // we created by a concurrent thread or application attempting to handle schema drift.
                         for (RecordField field : missing) {
                             try {
-                                String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
-                                AlterTableOptions alter = new AlterTableOptions();
-                                alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
-                                kuduClient.alterTable(tableName, alter);
+                                final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+                                kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
                             } catch (KuduException e) {
                                 // Ignore the exception if the column already exists due to concurrent
                                 // threads or applications attempting to handle schema drift.
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 52ab00d..4f634fd 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kudu;
 
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
@@ -37,6 +38,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -47,6 +49,9 @@ public class MockPutKudu extends PutKudu {
     private KuduSession session;
     private LinkedList<Insert> insertQueue;
 
+    // Atomic reference is used as the set and use of the schema are in different thread
+    private AtomicReference<Schema> tableSchema = new AtomicReference<>();
+
     private boolean loggedIn = false;
     private boolean loggedOut = false;
 
@@ -102,7 +107,9 @@ public class MockPutKudu extends PutKudu {
         final KuduClient client = mock(KuduClient.class);
 
         try {
-            when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+            final KuduTable kuduTable = mock(KuduTable.class);
+            when(client.openTable(anyString())).thenReturn(kuduTable);
+            when(kuduTable.getSchema()).thenReturn(tableSchema.get());
         } catch (final Exception e) {
             throw new AssertionError(e);
         }
@@ -173,7 +180,11 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected KuduSession createKuduSession(KuduClient client) {
+    protected KuduSession createKuduSession(final KuduClient client) {
         return session;
     }
+
+    void setTableSchema(final Schema tableSchema) {
+        this.tableSchema.set(tableSchema);
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 303a03b..97e3e3d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -25,9 +25,9 @@ import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.RowErrorsAndOverflowStatus;
-import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
@@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -85,7 +86,7 @@ public class TestPutKudu {
     public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
     public static final String DEFAULT_MASTERS = "testLocalHost:7051";
     public static final String SKIP_HEAD_LINE = "false";
-    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal";
+    public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal";
 
     private TestRunner testRunner;
 
@@ -122,9 +123,10 @@ public class TestPutKudu {
         readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
         readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
         readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+        readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3)));
 
         for (int i=0; i < numOfRecord; i++) {
-            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i)));
         }
 
         testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -260,6 +262,26 @@ public class TestPutKudu {
     }
 
     @Test
+    public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException, IOException {
+        // given
+        processor.setTableSchema(new Schema(Arrays.asList()));
+        createRecordReader(5);
+        final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
+        testRunner.enqueue("trigger", flowFileAttributes);
+
+        // when
+        testRunner.run();
+
+        // then
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
         createRecordReader(10);
 
@@ -288,7 +310,7 @@ public class TestPutKudu {
     public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
         createRecordReader(0);
         // add the favorite color as a string
-        readerFactory.addRecord(1, "name0", "0", "89.89");
+        readerFactory.addRecord(1, "name0", "0", "89.89", "111.111");
 
         final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
 
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 3a4bd20..91fd7c2 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -207,6 +207,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
                 || RecordFieldType.BIGINT.equals(dataType)
                 || RecordFieldType.FLOAT.equals(dataType)
                 || RecordFieldType.DOUBLE.equals(dataType)
+                || RecordFieldType.DECIMAL.equals(dataType)
                 || RecordFieldType.BOOLEAN.equals(dataType);
 
     }
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 02a54b9..34bb657 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -29,8 +29,8 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.ListRecordSet;
@@ -47,6 +47,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Arrays;
@@ -71,17 +72,20 @@ public class TestPrometheusRecordSink {
 
         List<RecordField> recordFields = Arrays.asList(
                 new RecordField("field1", RecordFieldType.INT.getDataType()),
-                new RecordField("field2", RecordFieldType.STRING.getDataType())
+                new RecordField("field2", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)),
+                new RecordField("field3", RecordFieldType.STRING.getDataType())
         );
         RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
 
         Map<String, Object> row1 = new HashMap<>();
         row1.put("field1", 15);
-        row1.put("field2", "Hello");
+        row1.put("field2", BigDecimal.valueOf(12.34567D));
+        row1.put("field3", "Hello");
 
         Map<String, Object> row2 = new HashMap<>();
         row2.put("field1", 6);
-        row2.put("field2", "World!");
+        row2.put("field2", BigDecimal.valueOf(0.1234567890123456789D));
+        row2.put("field3", "World!");
 
         RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
                 new MapRecord(recordSchema, row1),
@@ -95,8 +99,10 @@ public class TestPrometheusRecordSink {
         assertEquals(2, writeResult.getRecordCount());
         assertEquals("Hello", writeResult.getAttributes().get("a"));
 
+
         final String content = getMetrics();
-        assertTrue(content.contains("field1{field2=\"Hello\",} 15.0\nfield1{field2=\"World!\",} 6.0\n"));
+        assertTrue(content.contains("field1{field3=\"Hello\",} 15.0\nfield1{field3=\"World!\",} 6.0\n"));
+        assertTrue(content.contains("field2{field3=\"Hello\",} 12.34567\nfield2{field3=\"World!\",} 0.12345678901234568\n"));
 
         try {
             sink.onStopped();
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
index 789ab00..55c0de4 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
@@ -122,6 +122,11 @@ public class RecordSinkHandler extends AbstractActionHandlerService{
             if (value.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(value);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
index 8566a2a..bd11534 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -79,10 +80,12 @@ public class TestRecordSinkHandler {
         final Map<String,String> attributes = new HashMap<>();
         final Map<String,Object> metrics = new HashMap<>();
         final String expectedMessage = "Records written to sink service:";
+        final BigDecimal bigDecimalValue = new BigDecimal(String.join("", Collections.nCopies(400, "1")) + ".2");
 
         attributes.put("sendZeroResults","false");
         metrics.put("jvmHeap","1000000");
         metrics.put("cpu","90");
+        metrics.put("custom", bigDecimalValue);
 
         final Action action = new Action();
         action.setType("SEND");
@@ -96,6 +99,7 @@ public class TestRecordSinkHandler {
         Map<String,Object> record = rows.get(0);
         assertEquals("90", (record.get("cpu")));
         assertEquals("1000000", (record.get("jvmHeap")));
+        assertEquals(bigDecimalValue, (record.get("custom")));
     }
 
     private static class MockRecordSinkHandler extends RecordSinkHandler {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 5f8328f..cf63a70 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -365,6 +365,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
                 case FLOAT:
                 case INT:
                 case BIGINT:
+                case DECIMAL:
                 case LONG:
                 case SHORT:
                 case STRING:
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index 60fa072..af1f2a4 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -404,7 +404,7 @@ public class SolrUtils {
                 continue;
             }else {
                 final DataType dataType = schema.getDataType(field.getFieldName()).get();
-                writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
+                writeValue(inputDocument, value, fieldName, dataType, fieldsToIndex);
             }
         }
     }
@@ -462,6 +462,9 @@ public class SolrUtils {
                     addFieldToSolrDocument(inputDocument,fieldName, coercedValue,fieldsToIndex);
                 }
                 break;
+            case DECIMAL:
+                addFieldToSolrDocument(inputDocument, fieldName, DataTypeUtils.toBigDecimal(coercedValue, fieldName), fieldsToIndex);
+                break;
             case BOOLEAN:
                 final String stringValue = coercedValue.toString();
                 if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
new file mode 100644
index 0000000..edc678e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SolrUtilsTest {
+
+    @Mock
+    private SolrInputDocument inputDocument;
+
+    @Test
+    public void test() throws Exception {
+        // given
+        final String value = "12345678901234567890.123456789012345678901234567890";
+        final BigDecimal bigDecimalValue = new BigDecimal(value);
+        final List<RecordField> fields = Collections.singletonList(new RecordField("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("test", bigDecimalValue);
+
+        final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+        final List<String> fieldsToIndex = Collections.singletonList("parent_test");
+
+        // when
+        SolrUtils.writeRecord(record, inputDocument, fieldsToIndex, "parent");
+
+        // then
+        Mockito.verify(inputDocument, Mockito.times(1)).addField("parent_test", bigDecimalValue);
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 18cbc63..4f0fec2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -46,6 +46,7 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 
 import java.lang.reflect.Type;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -223,6 +224,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
                 return typeFactory.createJavaType(HashMap.class);
             case BIGINT:
                 return typeFactory.createJavaType(BigInteger.class);
+            case DECIMAL:
+                return typeFactory.createJavaType(BigDecimal.class);
             case CHOICE:
                 final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
                 DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index d888e3c..f17fe3c 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -133,6 +133,7 @@
                 <configuration>
                     <excludes combine.children="append">
                         <exclude>src/test/resources/avro/datatypes.avsc</exclude>
+                        <exclude>src/test/resources/avro/decimals.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
index 746b1ce..f073464 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
@@ -104,6 +104,7 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
             case LONG:
             case FLOAT:
             case DOUBLE:
+            case DECIMAL:
             case BYTE:
             case CHAR:
             case SHORT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index 10b18a9..bc25744 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -89,6 +89,11 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
             if (value.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(value);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 6526ebe..733d3a5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -156,6 +156,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
         switch (fieldType) {
             case BIGINT:
             case BYTE:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case LONG:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
index 02587cc..10b4129 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
@@ -23,7 +23,9 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.DecimalNode;
 
+import java.math.BigDecimal;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
@@ -62,6 +64,12 @@ public class JsonSchemaInference extends HierarchicalSchemaInference<JsonNode> {
             return RecordFieldType.LONG.getDataType();
         }
 
+        if (jsonNode.isBigDecimal()) {
+            final DecimalNode decimalNode = (DecimalNode) jsonNode;
+            final BigDecimal value = decimalNode.getDecimalValue();
+            return RecordFieldType.DECIMAL.getDecimalDataType(value.precision(), value.scale());
+        }
+
         if (jsonNode.isFloatingPointNumber()) {
             return RecordFieldType.DOUBLE.getDataType();
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index d0172e9..469eb80 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -160,6 +160,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index dd7eaec..80c8512 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -376,6 +376,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
             case STRING:
                 generator.writeString(coercedValue.toString());
                 break;
+            case DECIMAL:
+                generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+                break;
             case BIGINT:
                 if (coercedValue instanceof Long) {
                     generator.writeNumber(((Long) coercedValue).longValue());
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
index 16a1e9e..8b4710b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
@@ -250,6 +250,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
index 4a2eacb..2cb165b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
@@ -145,6 +145,7 @@ public class XMLRecordReader implements RecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
@@ -528,6 +529,7 @@ public class XMLRecordReader implements RecordReader {
             case BOOLEAN:
             case BYTE:
             case CHAR:
+            case DECIMAL:
             case DOUBLE:
             case FLOAT:
             case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
index 134e9a2..0e2ccab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
@@ -57,6 +57,11 @@ public class XmlSchemaInference extends HierarchicalSchemaInference<XmlNode> {
             if (text.contains(".")) {
                 try {
                     final double doubleValue = Double.parseDouble(text);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(text.length() - 1, text.length() - 1 - text.indexOf("."));
+                    }
+
                     if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
                         return RecordFieldType.DOUBLE.getDataType();
                     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index b5fd869..d310d4f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -113,7 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType());
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType());
             assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType());
-            assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType());
+            assertEquals(RecordFieldType.DECIMAL, recordSchema.getDataType("decimal").get().getFieldType());
 
             final Record record = reader.nextRecord();
             assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis"));
@@ -123,7 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
             noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
             assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
-            assertEquals(bigDecimal.doubleValue(), record.getValue("decimal"));
+            assertEquals(bigDecimal, record.getValue("decimal"));
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 7a35ba5..a25e80e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -33,6 +33,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -139,6 +140,53 @@ public abstract class TestWriteAvroResult {
     }
 
     @Test
+    public void testDecimalType() throws IOException {
+        final Object[][] decimals = new Object[][] {
+                // id, record field, value, expected value
+
+                // Uses the whole precision and scale
+                {1, RecordFieldType.DECIMAL.getDecimalDataType(10, 2),  new BigDecimal("12345678.12"),  new BigDecimal("12345678.12")},
+
+                // Uses less precision and scale than allowed
+                {2, RecordFieldType.DECIMAL.getDecimalDataType(10, 2),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+
+                // Record schema uses smaller precision and scale than allowed
+                {3, RecordFieldType.DECIMAL.getDecimalDataType(8, 1),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+
+                // Record schema uses bigger precision and scale than allowed
+                {4, RecordFieldType.DECIMAL.getDecimalDataType(16, 4),  new BigDecimal("123456.1"),  new BigDecimal("123456.10")},
+        };
+
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/decimals.avsc"));
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final List<RecordField> fields = new ArrayList<>();
+        final Map<String, Object> values = new HashMap<>();
+
+        for (final Object[] decimal : decimals) {
+            fields.add(new RecordField("decimal" + decimal[0], (DataType) decimal[1]));
+            values.put("decimal" + decimal[0], decimal[2]);
+        }
+
+        final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writer.write(RecordSet.of(record.getSchema(), record));
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final GenericRecord avroRecord = readRecord(in, schema);
+
+            for (final Object[] decimal : decimals) {
+                final Schema decimalSchema = schema.getField("decimal" + decimal[0]).schema();
+                final LogicalType logicalType = decimalSchema.getLogicalType();
+                Assert.assertEquals(decimal[3], new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal" + decimal[0]), decimalSchema, logicalType));
+            }
+        }
+    }
+
+    @Test
     public void testLogicalTypes() throws IOException, ParseException {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
         testLogicalTypes(schema);
@@ -159,8 +207,7 @@ public abstract class TestWriteAvroResult {
         fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
-        // Avro decimal is represented as double in NiFi type system.
-        fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(5,2)));
         final RecordSchema recordSchema = new SimpleRecordSchema(fields);
 
         final String expectedTime = "2017-04-04 14:20:33.789";
@@ -174,9 +221,7 @@ public abstract class TestWriteAvroResult {
         values.put("timestampMillis", new Timestamp(timeLong));
         values.put("timestampMicros", new Timestamp(timeLong));
         values.put("date", new Date(timeLong));
-        // Avro decimal is represented as double in NiFi type system.
-        final BigDecimal expectedDecimal = new BigDecimal("123.45");
-        values.put("decimal", expectedDecimal.doubleValue());
+        values.put("decimal", new BigDecimal("123.45"));
         final Record record = new MapRecord(recordSchema, values);
 
         try (final RecordSetWriter writer = createWriter(schema, baos)) {
@@ -201,7 +246,7 @@ public abstract class TestWriteAvroResult {
                     // Union type doesn't return logical type. Find the first logical type defined within the union.
                     : decimalSchema.getTypes().stream().map(s -> s.getLogicalType()).filter(Objects::nonNull).findFirst().get();
             final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, logicalType);
-            assertEquals(expectedDecimal, decimal);
+            assertEquals(new BigDecimal("123.45"), decimal);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 230a0e3..ca36430 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -36,6 +36,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Time;
@@ -45,6 +46,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -120,6 +122,26 @@ public class TestCSVRecordReader {
     }
 
     @Test
+    public void testBigDecimal() throws IOException, MalformedRecordException {
+        final String value = String.join("", Collections.nCopies(500, "1")) + ".2";
+        final String text = "decimal\n" + value;
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
+             final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), StandardCharsets.UTF_8.name())) {
+
+            final Record record = reader.nextRecord();
+            final BigDecimal result = (BigDecimal)record.getValue("decimal");
+
+            assertEquals(new BigDecimal(value), result);
+        }
+    }
+
+    @Test
     public void testDateNoCoersionExpectedFormat() throws IOException, MalformedRecordException {
         final String text = "date\n11/30/1983";
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index bb3d0fe..d5569f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
@@ -126,6 +127,7 @@ public class TestWriteCSVResult {
             valueMap.put("long", 8L);
             valueMap.put("float", 8.0F);
             valueMap.put("double", 8.0D);
+            valueMap.put("decimal", BigDecimal.valueOf(8.1D));
             valueMap.put("date", new Date(now));
             valueMap.put("time", new Time(now));
             valueMap.put("timestamp", new Timestamp(now));
@@ -154,7 +156,7 @@ public class TestWriteCSVResult {
 
         final String values = splits[1];
         final StringBuilder expectedBuilder = new StringBuilder();
-        expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
+        expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
 
         final String expectedValues = expectedBuilder.toString();
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 0e50764..82b5b87 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -75,10 +75,10 @@ public class TestJsonSchemaInference {
         assertSame(RecordFieldType.LONG, schema.getDataType("setc").get().getFieldType());
         assertSame(RecordFieldType.LONG, schema.getDataType("boolc").get().getFieldType());
         assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("binaryc").get());
+        // We currently do not read BigDecimal from JSON as ObjectMapper in InferenceSchemaStrategy automatically reads it as double
 
         final List<String> fieldNames = schema.getFieldNames();
         assertEquals(Arrays.asList("varcharc", "uuid", "tinyintc", "textc", "datec", "smallintc", "mediumintc", "intc", "bigintc",
                 "floatc", "doublec", "decimalc", "timestampc", "timec", "charc", "tinytextc", "blobc", "mediumtextc", "enumc", "setc", "boolc", "binaryc"), fieldNames);
     }
-
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 759a5ff..bea213f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -38,6 +38,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,10 +60,14 @@ public class TestJsonTreeRowRecordReader {
     private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
 
     private List<RecordField> getDefaultFields() {
+        return getFields(RecordFieldType.DOUBLE.getDataType());
+    }
+
+    private List<RecordField> getFields(final DataType balanceDataType) {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("balance", balanceDataType));
         fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
@@ -249,7 +254,8 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testReadMultilineJSON() throws IOException, MalformedRecordException {
-        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
@@ -260,14 +266,14 @@ public class TestJsonTreeRowRecordReader {
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
             final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+                    RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
-            Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+            Assert.assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
 
             final Object[] secondRecordValues = reader.nextRecord().getValues();
-            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+            Assert.assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
 
             assertNull(reader.nextRecord());
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index b941c09..e8b5cef 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -35,6 +35,7 @@ import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -94,6 +95,7 @@ public class TestWriteJsonResult {
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
+        valueMap.put("decimal", BigDecimal.valueOf(8.1D));
         valueMap.put("date", new Date(time));
         valueMap.put("time", new Time(time));
         valueMap.put("timestamp", new Timestamp(time));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
index caf0038..682abcd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
@@ -144,6 +144,26 @@ public class TestFieldTypeInference {
         runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
     }
 
+    @Test
+    public void testToDataTypeWhenDecimal() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.DECIMAL.getDecimalDataType(10, 1),
+                RecordFieldType.DECIMAL.getDecimalDataType(10, 3),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 3),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 5),
+                RecordFieldType.DECIMAL.getDecimalDataType(7, 7),
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.DOUBLE.getDataType()
+        );
+
+        DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(10, 7);
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
+    }
+
     private SimpleRecordSchema createRecordSchema(String fieldName, DataType fieldType) {
         return new SimpleRecordSchema(Arrays.asList(
                 new RecordField(fieldName, fieldType)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
index 6ab9229..0cdd217 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
@@ -189,6 +189,7 @@ public class TestWriteXMLResult {
         valueMap.put("long", 8L);
         valueMap.put("float", 8.0F);
         valueMap.put("double", 8.0D);
+        valueMap.put("decimal", 8.1D);
         valueMap.put("date", new Date(time));
         valueMap.put("time", new Time(time));
         valueMap.put("timestamp", new Timestamp(time));
@@ -207,8 +208,8 @@ public class TestWriteXMLResult {
         writer.flush();
 
         String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><short>8</short>" +
-                "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><date>2017-01-01</date>" +
-                "<time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
+                "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><decimal>8.1</decimal>" +
+                "<date>2017-01-01</date><time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
                 "<map><height>48</height><width>96</width></map></RECORD></ROOT>";
 
         assertThat(xmlResult, CompareMatcher.isSimilarTo(out.toString()).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
index ef3d692..252b572 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
@@ -33,6 +33,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -401,6 +402,22 @@ public class TestXMLRecordReader {
     }
 
     @Test
+    public void testSimpleRecordWithAttribute6() throws IOException, MalformedRecordException {
+        // given
+        final InputStream is = new FileInputStream("src/test/resources/xml/people2.xml");
+        final List<RecordField> fields = getSimpleRecordFields();
+        fields.add(new RecordField("ID", RecordFieldType.DECIMAL.getDecimalDataType(38, 10)));
+        final XMLRecordReader reader = new XMLRecordReader(is, new SimpleRecordSchema(fields), true,
+                null, "CONTENT", dateFormat, timeFormat, timestampFormat, Mockito.mock(ComponentLog.class));
+
+        // when
+        final Record record = reader.nextRecord(true, false);
+
+        // then
+        assertEquals(BigDecimal.class, record.getValue("ID").getClass());
+    }
+
+    @Test
     public void testSimpleRecordWithAttributeCoerceFalseDropFalse() throws IOException, MalformedRecordException {
         InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
         List<RecordField> fields = getSimpleRecordFields();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
new file mode 100644
index 0000000..7529378
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
@@ -0,0 +1,36 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+        "name" : "decimal1", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+        "name" : "decimal2", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+        "name" : "decimal3", "type": {
+            "type" : "bytes",
+            "logicalType" : "decimal",
+            "precision" : 10,
+            "scale" : 2
+        }
+    }, {
+		"name" : "decimal4", "type": {
+			"type" : "bytes",
+			"logicalType" : "decimal",
+			"precision" : 10,
+			"scale" : 2
+        }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
index 0472512..e6de436 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
@@ -7,6 +7,7 @@
   "bigint" : 8,
   "float" : 8.0,
   "double" : 8.0,
+  "decimal" : 8.1,
   "timestamp" : "2017-01-01 17:00:00",
   "date" : "2017-01-01",
   "time" : "17:00:00",