You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/02 19:13:59 UTC
[nifi] 01/02: NIFI-7369 Adding decimal support for record handling
in order to avoid missing precision when reading in records
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5c2bfcf7d3b8fdafe70ee4920645bb3e8fa5e539
Author: Bence Simon <si...@gmail.com>
AuthorDate: Wed Apr 22 15:48:10 2020 +0200
NIFI-7369 Adding decimal support for record handling in order to avoid missing precision when reading in records
Signed-off-by: Mark Payne <ma...@hotmail.com>
---
.../nifi/serialization/record/RecordFieldType.java | 19 ++-
.../serialization/record/ResultSetRecordSet.java | 29 +++-
.../serialization/record/type/DecimalDataType.java | 70 +++++++++
.../serialization/record/util/DataTypeUtils.java | 94 +++++++++++-
.../record/ResultSetRecordSetTest.java | 165 +++++++++++++++++++++
.../serialization/record/TestDataTypeUtils.java | 136 +++++++++++++++++
.../elasticsearch/PutElasticsearchHttpRecord.java | 3 +
.../TestPutElasticsearchHttpRecord.java | 4 +-
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 11 +-
.../schema/access/InferenceSchemaStrategy.java | 4 +
.../org/apache/nifi/avro/TestAvroTypeUtil.java | 42 +++++-
.../schema/access/InferenceSchemaStrategyTest.java | 143 ++++++++++++++++++
.../schema/validation/StandardSchemaValidator.java | 10 +-
.../validation/TestStandardSchemaValidator.java | 13 +-
.../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 6 +
.../org/apache/nifi/util/orc/TestNiFiOrcUtils.java | 3 +
.../apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 15 ++
.../org/apache/nifi/processors/orc/PutORCTest.java | 15 +-
.../org/apache/nifi/util/orc/TestNiFiOrcUtils.java | 20 +++
.../nifi/controller/kudu/KuduLookupService.java | 6 +-
.../processors/kudu/AbstractKuduProcessor.java | 38 ++++-
.../org/apache/nifi/processors/kudu/PutKudu.java | 9 +-
.../apache/nifi/processors/kudu/MockPutKudu.java | 15 +-
.../apache/nifi/processors/kudu/TestPutKudu.java | 30 +++-
.../reporting/prometheus/PrometheusRecordSink.java | 1 +
.../prometheus/TestPrometheusRecordSink.java | 16 +-
.../nifi/rules/handlers/RecordSinkHandler.java | 5 +
.../nifi/rules/handlers/TestRecordSinkHandler.java | 4 +
.../reporting/AbstractSiteToSiteReportingTask.java | 1 +
.../org/apache/nifi/processors/solr/SolrUtils.java | 5 +-
.../apache/nifi/processors/solr/SolrUtilsTest.java | 62 ++++++++
.../org/apache/nifi/queryrecord/FlowFileTable.java | 3 +
.../nifi-record-serialization-services/pom.xml | 1 +
.../apache/nifi/csv/AbstractCSVRecordReader.java | 1 +
.../org/apache/nifi/csv/CSVSchemaInference.java | 5 +
.../java/org/apache/nifi/csv/WriteCSVResult.java | 1 +
.../org/apache/nifi/json/JsonSchemaInference.java | 8 +
.../apache/nifi/json/JsonTreeRowRecordReader.java | 1 +
.../java/org/apache/nifi/json/WriteJsonResult.java | 3 +
.../java/org/apache/nifi/xml/WriteXMLResult.java | 1 +
.../java/org/apache/nifi/xml/XMLRecordReader.java | 2 +
.../nifi/xml/inference/XmlSchemaInference.java | 5 +
.../avro/TestAvroReaderWithEmbeddedSchema.java | 4 +-
.../org/apache/nifi/avro/TestWriteAvroResult.java | 57 ++++++-
.../org/apache/nifi/csv/TestCSVRecordReader.java | 22 +++
.../org/apache/nifi/csv/TestWriteCSVResult.java | 4 +-
.../apache/nifi/json/TestJsonSchemaInference.java | 2 +-
.../nifi/json/TestJsonTreeRowRecordReader.java | 16 +-
.../org/apache/nifi/json/TestWriteJsonResult.java | 2 +
.../schema/inference/TestFieldTypeInference.java | 20 +++
.../org/apache/nifi/xml/TestWriteXMLResult.java | 5 +-
.../org/apache/nifi/xml/TestXMLRecordReader.java | 17 +++
.../src/test/resources/avro/decimals.avsc | 36 +++++
.../src/test/resources/json/output/dataTypes.json | 1 +
54 files changed, 1146 insertions(+), 65 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index 18b0781..4e18fb9 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
@@ -73,6 +74,11 @@ public enum RecordFieldType {
DOUBLE("double", FLOAT),
/**
+ * A decimal field type. Fields of this type use a {@code java.math.BigDecimal} value.
+ */
+ DECIMAL("decimal", FLOAT, DOUBLE),
+
+ /**
* A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
*/
TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
@@ -95,7 +101,7 @@ public enum RecordFieldType {
/**
* A String field type. Fields of this type use a {@code java.lang.String} value.
*/
- STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP),
+ STRING("string", BOOLEAN, BYTE, CHAR, SHORT, INT, BIGINT, LONG, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP),
/**
* <p>
@@ -349,6 +355,17 @@ public enum RecordFieldType {
}
/**
+ * Returns a Data Type that represents a decimal type with the given precision and scale.
+ *
+ * @param precision the precision of the decimal
+ * @param scale the scale of the decimal
+ * @return a DataType that represents a decimal with added information about it's precision and scale.
+ */
+ public DataType getDecimalDataType(final int precision, final int scale) {
+ return new DecimalDataType(precision, scale);
+ }
+
+ /**
* Determines whether or this this RecordFieldType is "wider" than the provided type. A type "A" is said to be wider
* than another type "B" iff A encompasses all values of B and more. For example, the LONG type is wider than INT, and INT
* is wider than SHORT. "Complex" types (MAP, RECORD, ARRAY, CHOICE) are not wider than any other type, and no other type is
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 953b511..3b98657 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Array;
import java.sql.ResultSet;
@@ -53,6 +54,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private static final String DATE_CLASS_NAME = Date.class.getName();
private static final String DOUBLE_CLASS_NAME = Double.class.getName();
private static final String FLOAT_CLASS_NAME = Float.class.getName();
+ private static final String BIGDECIMAL_CLASS_NAME = BigDecimal.class.getName();
public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
this.rs = rs;
@@ -194,6 +196,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.LONGVARBINARY:
case Types.VARBINARY:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ final BigDecimal bigDecimal = rs.getBigDecimal(columnIndex);
+ return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
case Types.OTHER: {
// If we have no records to inspect, we can't really know its schema so we simply use the default data type.
if (rs.isAfterLast()) {
@@ -212,8 +218,8 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final Object obj = rs.getObject(columnIndex);
if (!(obj instanceof Record)) {
final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
- RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME,
- RecordFieldType.TIMESTAMP)
+ RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
+ RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
.map(RecordFieldType::getDataType)
.collect(Collectors.toList());
@@ -234,7 +240,14 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
}
}
- return getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex)).getDataType();
+ final RecordFieldType fieldType = getFieldType(sqlType, rs.getMetaData().getColumnClassName(columnIndex));
+
+ if (RecordFieldType.DECIMAL.equals(fieldType)) {
+ final BigDecimal bigDecimalValue = rs.getBigDecimal(columnIndex);
+ return fieldType.getDecimalDataType(bigDecimalValue.precision(), bigDecimalValue.scale());
+ } else {
+ return fieldType.getDataType();
+ }
}
}
}
@@ -310,6 +323,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (valueToLookAt instanceof Double) {
return RecordFieldType.DOUBLE.getDataType();
}
+ if (valueToLookAt instanceof BigDecimal) {
+ final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
+ return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+ }
if (valueToLookAt instanceof Boolean) {
return RecordFieldType.BOOLEAN.getDataType();
}
@@ -353,9 +370,10 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return RecordFieldType.CHAR;
case Types.DATE:
return RecordFieldType.DATE;
+ case Types.NUMERIC:
case Types.DECIMAL:
+ return RecordFieldType.DECIMAL;
case Types.DOUBLE:
- case Types.NUMERIC:
case Types.REAL:
return RecordFieldType.DOUBLE;
case Types.FLOAT:
@@ -393,6 +411,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (DOUBLE_CLASS_NAME.equals(valueClassName)) {
return RecordFieldType.DOUBLE;
}
+ if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) {
+ return RecordFieldType.DECIMAL;
+ }
return RecordFieldType.RECORD;
case Types.TIME:
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
new file mode 100644
index 0000000..4020bcb
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/DecimalDataType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class DecimalDataType extends DataType {
+ private final int precision;
+ private final int scale;
+
+ public DecimalDataType(final int precision, final int scale) {
+ super(RecordFieldType.DECIMAL, null);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 31;
+ hash = 41 * hash + getFieldType().hashCode();
+ hash = 41 * hash + precision;
+ hash = 41 * hash + scale;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof DecimalDataType)) {
+ return false;
+ }
+
+ final DecimalDataType other = (DecimalDataType) obj;
+ return getPrecision() == other.getPrecision() && getScale() == other.getScale();
+ }
+
+ @Override
+ public String toString() {
+ return getFieldType().toString() + "[" + precision + "," + scale + "]";
+ }
+}
+
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f644fda..e1ee122 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.Reader;
import java.lang.reflect.Array;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -93,7 +95,14 @@ public class DataTypeUtils {
"(" + Base10Decimal + OptionalBase10Exponent + ")" +
")";
+ private static final String decimalRegex =
+ OptionalSign +
+ "(" + Base10Digits + OptionalBase10Decimal + ")" + "|" +
+ "(" + Base10Digits + OptionalBase10Decimal + Base10Exponent + ")" + "|" +
+ "(" + Base10Decimal + OptionalBase10Exponent + ")";
+
private static final Pattern FLOATING_POINT_PATTERN = Pattern.compile(doubleRegex);
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(decimalRegex);
private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
@@ -127,6 +136,7 @@ public class DataTypeUtils {
NUMERIC_VALIDATORS.put(RecordFieldType.SHORT, value -> value instanceof Short);
NUMERIC_VALIDATORS.put(RecordFieldType.DOUBLE, value -> value instanceof Double);
NUMERIC_VALIDATORS.put(RecordFieldType.FLOAT, value -> value instanceof Float);
+ NUMERIC_VALIDATORS.put(RecordFieldType.DECIMAL, value -> value instanceof BigDecimal);
}
public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
@@ -174,6 +184,8 @@ public class DataTypeUtils {
return toCharacter(value, fieldName);
case DATE:
return toDate(value, dateFormat, fieldName);
+ case DECIMAL:
+ return toBigDecimal(value, fieldName);
case DOUBLE:
return toDouble(value, fieldName);
case FLOAT:
@@ -228,6 +240,8 @@ public class DataTypeUtils {
return isCharacterTypeCompatible(value);
case DATE:
return isDateTypeCompatible(value, dataType.getFormat());
+ case DECIMAL:
+ return isDecimalTypeCompatible(value);
case DOUBLE:
return isDoubleTypeCompatible(value);
case FLOAT:
@@ -483,6 +497,10 @@ public class DataTypeUtils {
if (value instanceof BigInteger) {
return RecordFieldType.BIGINT.getDataType();
}
+ if (value instanceof BigDecimal) {
+ final BigDecimal bigDecimal = (BigDecimal) value;
+ return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale());
+ }
}
if (value instanceof Boolean) {
@@ -1232,6 +1250,10 @@ public class DataTypeUtils {
return isNumberTypeCompatible(value, DataTypeUtils::isIntegral);
}
+ public static boolean isDecimalTypeCompatible(final Object value) {
+ return isNumberTypeCompatible(value, DataTypeUtils::isDecimal);
+ }
+
public static Boolean toBoolean(final Object value, final String fieldName) {
if (value == null) {
return null;
@@ -1266,6 +1288,50 @@ public class DataTypeUtils {
return false;
}
+ public static BigDecimal toBigDecimal(final Object value, final String fieldName) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ }
+
+ if (value instanceof Number) {
+ final Number number = (Number) value;
+
+ if (number instanceof Byte
+ || number instanceof Short
+ || number instanceof Integer
+ || number instanceof Long) {
+ return BigDecimal.valueOf(number.longValue());
+ }
+
+ if (number instanceof BigInteger) {
+ return new BigDecimal((BigInteger) number);
+ }
+
+ if (number instanceof Float) {
+ return new BigDecimal(Float.toString((Float) number));
+ }
+
+ if (number instanceof Double) {
+ return new BigDecimal(Double.toString((Double) number));
+ }
+ }
+
+ if (value instanceof String) {
+ try {
+ return new BigDecimal((String) value);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName
+ + ", value is not a valid representation of BigDecimal", nfe);
+ }
+ }
+
+ throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigDecimal for field " + fieldName);
+ }
+
public static Double toDouble(final Object value, final String fieldName) {
if (value == null) {
return null;
@@ -1322,6 +1388,14 @@ public class DataTypeUtils {
return isNumberTypeCompatible(value, s -> isFloatingPoint(s));
}
+ private static boolean isDecimal(final String value) {
+ if (value == null || value.isEmpty()) {
+ return false;
+ }
+
+ return DECIMAL_PATTERN.matcher(value).matches();
+ }
+
private static boolean isFloatingPoint(final String value) {
if (value == null || value.isEmpty()) {
return false;
@@ -1691,15 +1765,31 @@ public class DataTypeUtils {
case FLOAT:
if (otherFieldType == RecordFieldType.DOUBLE) {
return Optional.of(otherDataType);
+ } else if (otherFieldType == RecordFieldType.DECIMAL) {
+ return Optional.of(otherDataType);
}
break;
case DOUBLE:
if (otherFieldType == RecordFieldType.FLOAT) {
return Optional.of(thisDataType);
+ } else if (otherFieldType == RecordFieldType.DECIMAL) {
+ return Optional.of(otherDataType);
}
break;
+ case DECIMAL:
+ if (otherFieldType == RecordFieldType.DOUBLE) {
+ return Optional.of(thisDataType);
+ } else if (otherFieldType == RecordFieldType.FLOAT) {
+ return Optional.of(thisDataType);
+ } else if (otherFieldType == RecordFieldType.DECIMAL) {
+ final DecimalDataType thisDecimalDataType = (DecimalDataType) thisDataType;
+ final DecimalDataType otherDecimalDataType = (DecimalDataType) otherDataType;
-
+ final int precision = Math.max(thisDecimalDataType.getPrecision(), otherDecimalDataType.getPrecision());
+ final int scale = Math.max(thisDecimalDataType.getScale(), otherDecimalDataType.getScale());
+ return Optional.of(RecordFieldType.DECIMAL.getDecimalDataType(precision, scale));
+ }
+ break;
case CHAR:
if (otherFieldType == RecordFieldType.STRING) {
return Optional.of(otherDataType);
@@ -1759,6 +1849,8 @@ public class DataTypeUtils {
return Types.DOUBLE;
case FLOAT:
return Types.FLOAT;
+ case DECIMAL:
+ return Types.NUMERIC;
case INT:
return Types.INTEGER;
case SHORT:
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
new file mode 100644
index 0000000..5c9a39c
--- /dev/null
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ResultSetRecordSetTest {
+ private static final Object[][] COLUMNS = new Object[][] {
+ // column number; column label / name / schema field; column type; schema data type;
+ {1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()},
+ {2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()},
+ {3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()},
+ {4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
+ {5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
+ {6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()},
+ {7, "date", Types.DATE, RecordFieldType.DATE.getDataType()},
+ {8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()},
+ {9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
+ {10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()},
+ {11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
+ {12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
+ {13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()},
+ {14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
+ {15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
+ {16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
+ };
+
+ @Mock
+ private ResultSet resultSet;
+
+ @Mock
+ private ResultSetMetaData resultSetMetaData;
+
+ @Before
+ public void setUp() throws SQLException {
+ Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
+
+ for (final Object[] column : COLUMNS) {
+ Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col");
+ Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
+ Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
+ }
+
+ // Big decimal values are necessary in order to determine precision and scale
+ Mockito.when(resultSet.getBigDecimal(14)).thenReturn(BigDecimal.valueOf(1234.567D));
+ Mockito.when(resultSet.getBigDecimal(15)).thenReturn(BigDecimal.valueOf(1234L));
+ Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
+
+ // This will be handled by a dedicated branch for Java Objects, needs some further details
+ Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
+ }
+
+ @Test
+ public void testCreateSchema() throws SQLException {
+ // given
+ final RecordSchema recordSchema = givenRecordSchema();
+
+ // when
+ final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllColumnDataTypesAreCorrect(resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenNoRecordSchema() throws SQLException {
+ // when
+ final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllColumnDataTypesAreCorrect(resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherType() throws SQLException {
+ // given
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("column", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+ final ResultSet resultSet = givenResultSetForOther();
+
+ // when
+ final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherTypeWithoutSchema() throws SQLException {
+ // given
+ final ResultSet resultSet = givenResultSetForOther();
+
+ // when
+ final ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, null);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
+ }
+
+ private ResultSet givenResultSetForOther() throws SQLException {
+ final ResultSet resultSet = Mockito.mock(ResultSet.class);
+ final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
+ Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1);
+ Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+ Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column");
+ Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
+ return resultSet;
+ }
+
+ private RecordSchema givenRecordSchema() {
+ final List<RecordField> fields = new ArrayList<>();
+
+ for (final Object[] column : COLUMNS) {
+ fields.add(new RecordField((String) column[1], (DataType) column[3]));
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
+ Assert.assertNotNull(resultSchema);
+
+ for (final Object[] column : COLUMNS) {
+ Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 6d35789..d82be97 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -24,9 +24,11 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.junit.Test;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.DoubleAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -266,6 +269,113 @@ public class TestDataTypeUtils {
}
@Test
+ public void testConvertToBigDecimalWhenInputIsValid() {
+ // given
+ final BigDecimal expectedValue = BigDecimal.valueOf(12L);
+
+ // when & then
+ whenExpectingValidBigDecimalConversion(expectedValue, BigDecimal.valueOf(12L));
+ whenExpectingValidBigDecimalConversion(expectedValue, (byte) 12);
+ whenExpectingValidBigDecimalConversion(expectedValue, (short) 12);
+ whenExpectingValidBigDecimalConversion(expectedValue, 12);
+ whenExpectingValidBigDecimalConversion(expectedValue, 12L);
+ whenExpectingValidBigDecimalConversion(expectedValue, BigInteger.valueOf(12L));
+ whenExpectingValidBigDecimalConversion(expectedValue, 12F);
+ whenExpectingValidBigDecimalConversion(expectedValue, 12.000F);
+ whenExpectingValidBigDecimalConversion(expectedValue, 12D);
+ whenExpectingValidBigDecimalConversion(expectedValue, 12.000D);
+ whenExpectingValidBigDecimalConversion(expectedValue, "12");
+ whenExpectingValidBigDecimalConversion(expectedValue, "12.000");
+ }
+
+ @Test
+ public void testConvertToBigDecimalWhenNullInput() {
+ assertNull(DataTypeUtils.convertType(null, RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8));
+ }
+
+ @Test(expected = IllegalTypeConversionException.class)
+ public void testConvertToBigDecimalWhenInputStringIsInvalid() {
+ DataTypeUtils.convertType("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+ }
+
+ @Test(expected = IllegalTypeConversionException.class)
+ public void testConvertToBigDecimalWhenUnsupportedType() {
+ DataTypeUtils.convertType(new ArrayList<Double>(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+ }
+
+ @Test(expected = IllegalTypeConversionException.class)
+ public void testConvertToBigDecimalWhenUnsupportedNumberType() {
+ DataTypeUtils.convertType(new DoubleAdder(), RecordFieldType.DECIMAL.getDecimalDataType(30, 10), null, StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testCompatibleDataTypeBigDecimal() {
+ // given
+ final DataType dataType = RecordFieldType.DECIMAL.getDecimalDataType(30, 10);
+
+ // when & then
+ assertTrue(DataTypeUtils.isCompatibleDataType(new BigDecimal("1.2345678901234567890"), dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType(new BigInteger("12345678901234567890"), dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType(1234567890123456789L, dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType(1, dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType((byte) 1, dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType((short) 1, dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType("1.2345678901234567890", dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType(3.1F, dataType));
+ assertTrue(DataTypeUtils.isCompatibleDataType(3.0D, dataType));
+ assertFalse(DataTypeUtils.isCompatibleDataType("1234567XYZ", dataType));
+ assertFalse(DataTypeUtils.isCompatibleDataType(new Long[]{1L, 2L}, dataType));
+ }
+
+ @Test
+ public void testInferDataTypeWithBigDecimal() {
+ assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(3, 1), DataTypeUtils.inferDataType(BigDecimal.valueOf(12.3D), null));
+ }
+
+ @Test
+ public void testIsBigDecimalTypeCompatible() {
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible((byte) 13));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible((short) 13));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(12));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(12L));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigInteger.valueOf(12L)));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123F));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(12.123D));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible(BigDecimal.valueOf(12.123D)));
+ assertTrue(DataTypeUtils.isDecimalTypeCompatible("123"));
+
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible(null));
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible("test"));
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible(new ArrayList<>()));
+ // Decimal handling does not support NaN and Infinity as the underlying BigDecimal is unable to parse
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible("NaN"));
+ assertFalse(DataTypeUtils.isDecimalTypeCompatible("Infinity"));
+ }
+
+ @Test
+ public void testGetSQLTypeValueWithBigDecimal() {
+ assertEquals(Types.NUMERIC, DataTypeUtils.getSQLTypeValue(RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+ }
+
+ @Test
+ public void testChooseDataTypeWhenExpectedIsBigDecimal() {
+ // GIVEN
+ final List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.FLOAT.getDataType(),
+ RecordFieldType.DOUBLE.getDataType(),
+ RecordFieldType.DECIMAL.getDecimalDataType(2, 1),
+ RecordFieldType.DECIMAL.getDecimalDataType(20, 10)
+ );
+
+ final Object value = new BigDecimal("1.2");
+ final DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(2, 1);
+
+ // WHEN
+ // THEN
+ testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+ }
+
+ @Test
public void testFloatingPointCompatibility() {
final String[] prefixes = new String[] {"", "-", "+"};
final String[] exponents = new String[] {"e0", "e1", "e-1", "E0", "E1", "E-1"};
@@ -504,6 +614,11 @@ public class TestDataTypeUtils {
}
@Test
+ public void testFindMostSuitableTypeWithBigDecimal() {
+ testFindMostSuitableType(BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3));
+ }
+
+ @Test
public void testFindMostSuitableTypeWithFloat() {
testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
}
@@ -580,6 +695,27 @@ public class TestDataTypeUtils {
});
}
+ private void whenExpectingValidBigDecimalConversion(final BigDecimal expectedValue, final Object incomingValue) {
+ // Checking indirect conversion
+ final String failureMessage = "Conversion from " + incomingValue.getClass().getSimpleName() + " to " + expectedValue.getClass().getSimpleName() + " failed, when ";
+ final BigDecimal indirectResult = whenExpectingValidConversion(expectedValue, incomingValue, RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+ // In some cases, direct equality check comes with false negative as the changing representation brings in
+ // insignificant changes what might break the comparison. For example 12F will be represented as "12.0"
+ assertEquals(failureMessage + "indirect", 0, expectedValue.compareTo(indirectResult));
+
+ // Checking direct conversion
+ final BigDecimal directResult = DataTypeUtils.toBigDecimal(incomingValue, "field");
+ assertEquals(failureMessage + "direct", 0, expectedValue.compareTo(directResult));
+
+ }
+
+ private <T> T whenExpectingValidConversion(final T expectedValue, final Object incomingValue, final DataType dataType) {
+ final Object result = DataTypeUtils.convertType(incomingValue, dataType, null, StandardCharsets.UTF_8);
+ assertNotNull(result);
+ assertTrue(expectedValue.getClass().isInstance(result));
+ return (T) result;
+ }
+
@Test
public void testIsIntegerFitsToFloat() {
final int maxRepresentableInt = Double.valueOf(Math.pow(2, 24)).intValue();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index f1c45a0..2de4281 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -694,6 +694,9 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
generator.writeNumber((BigInteger) coercedValue);
}
break;
+ case DECIMAL:
+ generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+ break;
case BOOLEAN:
final String stringValue = coercedValue.toString();
if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index afe2055..fa133b4 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -41,6 +41,7 @@ import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.ConnectException;
import java.sql.Date;
import java.sql.Time;
@@ -664,9 +665,10 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("date", RecordFieldType.DATE);
parser.addSchemaField("time", RecordFieldType.TIME);
parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
+ parser.addSchemaField("amount", RecordFieldType.DECIMAL);
for(int i=1; i<=numRecords; i++) {
- parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L));
+ parser.addRecord(i, "reç" + i, 100 + i, new Date(1545282000000L), new Time(68150000), new Timestamp(1545332150000L), new BigDecimal(Double.MAX_VALUE).multiply(BigDecimal.TEN));
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 950e4cb..f54d329 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -256,6 +257,11 @@ public class AvroTypeUtil {
case LONG:
schema = Schema.create(Type.LONG);
break;
+ case DECIMAL:
+ final DecimalDataType decimalDataType = (DecimalDataType) dataType;
+ schema = Schema.create(Type.BYTES);
+ LogicalTypes.decimal(decimalDataType.getPrecision(), decimalDataType.getScale()).addToSchema(schema);
+ break;
case MAP:
schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, false));
break;
@@ -341,9 +347,8 @@ public class AvroTypeUtil {
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
case LOGICAL_TYPE_DECIMAL:
- // We convert Decimal to Double.
- // Alternatively we could convert it to String, but numeric type is generally more preferable by users.
- return RecordFieldType.DOUBLE.getDataType();
+ final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale());
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
index 1b80886..1c12926 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/InferenceSchemaStrategy.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
@@ -60,6 +61,9 @@ public class InferenceSchemaStrategy implements JsonSchemaAccessStrategy {
field = new RecordField(entry.getKey(), RecordFieldType.DOUBLE.getDataType());
} else if (entry.getValue() instanceof Date) {
field = new RecordField(entry.getKey(), RecordFieldType.DATE.getDataType());
+ } else if (entry.getValue() instanceof BigDecimal) {
+ final BigDecimal bigDecimal = (BigDecimal) entry.getValue();
+ field = new RecordField(entry.getKey(), RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale()));
} else if (entry.getValue() instanceof List) {
field = new RecordField(entry.getKey(), RecordFieldType.ARRAY.getDataType());
} else if (entry.getValue() instanceof Map) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 412d573..6cc6f4f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -78,6 +78,7 @@ public class TestAvroTypeUtil {
fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
+ fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
@@ -117,6 +118,7 @@ public class TestAvroTypeUtil {
assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
+ assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), afterConversion.getDataType("decimal").get());
assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
@@ -411,7 +413,7 @@ public class TestAvroTypeUtil {
public void testConvertAvroRecordToMapWithFieldTypeOfFixedAndLogicalTypeDecimal() {
// Create a field schema like {"type":"fixed","name":"amount","size":16,"logicalType":"decimal","precision":18,"scale":8}
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
- final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);;
+ final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);
decimalType.addToSchema(fieldSchema);
// Create a field named "amount" using the field schema above
@@ -421,24 +423,50 @@ public class TestAvroTypeUtil {
final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
// Create an example Avro record with the amount field of type fixed and a logical type of decimal
- final BigDecimal expectedDecimalValue = new BigDecimal("1234567890.12345678");
+ final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
final GenericRecord genericRecord = new GenericData.Record(avroSchema);
- genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedDecimalValue, fieldSchema, decimalType));
+ genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedBigDecimal, fieldSchema, decimalType));
// Convert the Avro schema to a Record schema
+ thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+ }
+
+ @Test
+ public void testConvertAvroRecordToMapWithFieldTypeOfBinaryAndLogicalTypeDecimal() {
+ // Create a field schema like {"type":"binary","name":"amount","logicalType":"decimal","precision":18,"scale":8}
+ final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
+ final Schema fieldSchema = Schema.create(Type.BYTES);
+ decimalType.addToSchema(fieldSchema);
+
+ // Create a field named "amount" using the field schema above
+ final Schema.Field field = new Schema.Field("amount", fieldSchema, null, (Object)null);
+
+ // Create an overall record schema with the amount field
+ final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
+
+ // Create an example Avro record with the amount field of type binary and a logical type of decimal
+ final BigDecimal expectedBigDecimal = new BigDecimal("1234567890.12345678");
+ final GenericRecord genericRecord = new GenericData.Record(avroSchema);
+ genericRecord.put("amount", new Conversions.DecimalConversion().toBytes(expectedBigDecimal, fieldSchema, decimalType));
+
+ // Convert the Avro schema to a Record schema
+ thenConvertAvroSchemaToRecordSchema(avroSchema, expectedBigDecimal, genericRecord);
+ }
+
+ private void thenConvertAvroSchemaToRecordSchema(Schema avroSchema, BigDecimal expectedBigDecimal, GenericRecord genericRecord) {
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
// Convert the Avro record a Map and verify the object produced is the same BigDecimal that was converted to fixed
- final Map<String,Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
+ final Map<String, Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
assertNotNull(convertedMap);
assertEquals(1, convertedMap.size());
final Object resultObject = convertedMap.get("amount");
assertNotNull(resultObject);
- assertTrue(resultObject instanceof Double);
+ assertTrue(resultObject instanceof BigDecimal);
- final Double resultDouble = (Double) resultObject;
- assertEquals(Double.valueOf(expectedDecimalValue.doubleValue()), resultDouble);
+ final BigDecimal resultBigDecimal = (BigDecimal) resultObject;
+ assertEquals(expectedBigDecimal, resultBigDecimal);
}
@Test
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
new file mode 100644
index 0000000..e2d82a9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/schema/access/InferenceSchemaStrategyTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class InferenceSchemaStrategyTest {
+
+ private static final Object[][] CONTENT_FIELDS = new Object[][] {
+ {"integer", 1, RecordFieldType.INT.getDataType()},
+ {"long", 1L, RecordFieldType.LONG.getDataType()},
+ {"boolean", true, RecordFieldType.BOOLEAN.getDataType()},
+ {"double", 1D, RecordFieldType.DOUBLE.getDataType()},
+ {"date", new Date(), RecordFieldType.DATE.getDataType()},
+ {"decimal", BigDecimal.valueOf(123.456D), RecordFieldType.DECIMAL.getDecimalDataType(6, 3)},
+ {"array", new ArrayList<String>(), RecordFieldType.ARRAY.getDataType()},
+
+ // date subclasses
+ {"time", new Time(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+ {"timestamp", new Timestamp(System.currentTimeMillis()), RecordFieldType.DATE.getDataType()},
+
+ // others are considered as string
+ {"byte", (byte) 1, RecordFieldType.STRING.getDataType()},
+ {"short", (short) 1, RecordFieldType.STRING.getDataType()},
+ {"bigint", BigInteger.ONE, RecordFieldType.STRING.getDataType()},
+ {"float", (float) 1, RecordFieldType.STRING.getDataType()},
+ {"char", (char) 1, RecordFieldType.STRING.getDataType()},
+ };
+
+ private final InferenceSchemaStrategy testSubject = new InferenceSchemaStrategy();
+
+ @Test
+ public void testSchemaConversion() throws Exception {
+ // when
+ final RecordSchema result = testSubject.getSchema(null, givenContent(), null);
+
+ // then
+ Assert.assertNotNull(result);
+ thenFieldsAreConvertedProperly(result, true);
+ }
+
+ @Test
+ public void testSchemaConversionWhenMap() throws Exception {
+ // given
+ final Map<String, Object> input = new HashMap<>();
+ final Map<String, Integer> field = new HashMap<>();
+ field.put("a1", 1);
+ field.put("a2", 2);
+ input.put("f1", field);
+
+ // when
+ final RecordSchema result = testSubject.getSchema(null, input, null);
+
+ // then
+ Assert.assertNotNull(result);
+ Assert.assertTrue(RecordDataType.class.isInstance(result.getField("f1").get().getDataType()));
+ final RecordDataType recordDataType = (RecordDataType) result.getField("f1").get().getDataType();
+
+ final RecordSchema childSchema = recordDataType.getChildSchema();
+ Assert.assertNotNull(childSchema);
+ Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a1").get().getDataType());
+ Assert.assertEquals(RecordFieldType.INT.getDataType(), childSchema.getField("a2").get().getDataType());
+ }
+
+ @Test
+ public void testSchemaConversionFromJsonString() throws Exception {
+ // given
+ final String json = "{\"double\":1.0,\"integer\":1,\"long\":9223372036854775,\"boolean\":true,\"array\":[]}";
+
+ // when
+ final RecordSchema result = testSubject.getSchema( null, new ByteArrayInputStream(json.getBytes()), null);
+
+ // then
+ Assert.assertNotNull(result);
+ thenFieldsAreConvertedProperly(result, false);
+ }
+
+ private Map<String, Object> givenContent() {
+ final HashMap<String, Object> result = new HashMap<>();
+
+ for (final Object[] contentField : CONTENT_FIELDS) {
+ result.put((String) contentField[0], contentField[1]);
+ }
+
+ return result;
+ }
+
+ private Map<String, DataType> givenExpected() {
+ final HashMap<String, DataType> result = new HashMap<>();
+
+ for (final Object[] contentField : CONTENT_FIELDS) {
+ result.put((String) contentField[0], (DataType) contentField[2]);
+ }
+
+ return result;
+ }
+
+ private void thenFieldsAreConvertedProperly(final RecordSchema result, final boolean mustPresent) {
+ final List<RecordField> fields = result.getFields();
+
+ for (final Map.Entry<String, DataType> expected : givenExpected().entrySet()) {
+ final Optional<RecordField> field = fields.stream().filter(f -> f.getFieldName().equals(expected.getKey())).findFirst();
+
+ if (field.isPresent()) {
+ Assert.assertEquals("\"" + expected.getKey() + "\" is expected to be converted " + expected.getValue().toString(), expected.getValue(), field.get().getDataType());
+ } else if (mustPresent) {
+ Assert.fail();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
index 76e7b7d..18b7c7a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/validation/StandardSchemaValidator.java
@@ -32,11 +32,10 @@ import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
import org.apache.nifi.serialization.record.validation.ValidationError;
import org.apache.nifi.serialization.record.validation.ValidationErrorType;
+import java.math.BigInteger;
import java.util.Map;
public class StandardSchemaValidator implements RecordSchemaValidator {
-
-
private final SchemaValidationContext validationContext;
public StandardSchemaValidator(final SchemaValidationContext validationContext) {
@@ -273,6 +272,13 @@ public class StandardSchemaValidator implements RecordSchemaValidator {
|| DataTypeUtils.isIntegerFitsToFloat(value)
|| DataTypeUtils.isLongFitsToFloat(value)
|| DataTypeUtils.isBigIntFitsToFloat(value);
+ case DECIMAL:
+ return DataTypeUtils.isFittingNumberType(value, dataType.getFieldType())
+ || value instanceof Byte
+ || value instanceof Short
+ || value instanceof Integer
+ || value instanceof Long
+ || value instanceof BigInteger;
}
return false;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
index dabfa9f..3f4e102 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/validation/TestStandardSchemaValidator.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.Time;
@@ -67,7 +68,8 @@ public class TestStandardSchemaValidator {
RecordFieldType.LONG,
RecordFieldType.BIGINT,
RecordFieldType.FLOAT,
- RecordFieldType.DOUBLE
+ RecordFieldType.DOUBLE,
+ RecordFieldType.DECIMAL
));
@Test
@@ -117,6 +119,7 @@ public class TestStandardSchemaValidator {
valueMap.put("short", (short) 8);
valueMap.put("int", 9);
valueMap.put("bigint", BigInteger.valueOf(8L));
+ valueMap.put("decimal", BigDecimal.valueOf(8.1D));
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
@@ -145,6 +148,9 @@ public class TestStandardSchemaValidator {
valueMap.put("float_as_double", 8.0F);
+ valueMap.put("float_as_decimal", 8.0F);
+ valueMap.put("double_as_decimal", 8.0D);
+
final Record record = new MapRecord(schema, valueMap);
final SchemaValidationContext validationContext = new SchemaValidationContext(schema, false, true);
@@ -170,18 +176,21 @@ public class TestStandardSchemaValidator {
public void testByteIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DOUBLE);
+ whenValueIsAcceptedAsDataTypeThenConsideredAsValid((byte) 9, RecordFieldType.DECIMAL);
}
@Test
public void testShortIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DOUBLE);
+ whenValueIsAcceptedAsDataTypeThenConsideredAsValid((short) 9, RecordFieldType.DECIMAL);
}
@Test
public void testIntegerWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT.intValue(), RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DOUBLE);
+ whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Integer.MAX_VALUE, RecordFieldType.DECIMAL);
}
@Test
@@ -194,6 +203,7 @@ public class TestStandardSchemaValidator {
public void testLongWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_FLOAT, RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(MAX_PRECISE_WHOLE_IN_DOUBLE, RecordFieldType.DOUBLE);
+ whenValueIsAcceptedAsDataTypeThenConsideredAsValid(Long.MAX_VALUE, RecordFieldType.DECIMAL);
}
@Test
@@ -206,6 +216,7 @@ public class TestStandardSchemaValidator {
public void testBigintWithinRangeIsConsideredToBeValidFloatingPoint() {
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.FLOAT);
whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(5L), RecordFieldType.DOUBLE);
+ whenValueIsAcceptedAsDataTypeThenConsideredAsValid(BigInteger.valueOf(Long.MAX_VALUE), RecordFieldType.DECIMAL);
}
@Test
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index ce06f82..b782534 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -22,8 +22,10 @@ import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
@@ -44,6 +46,7 @@ import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.io.OutputStream;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -104,6 +107,9 @@ public class NiFiOrcUtils {
if (o instanceof Double) {
return new DoubleWritable((double) o);
}
+ if (o instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+ }
if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
return new Text(o.toString());
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index cd7847f..74cdd2a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -22,6 +22,7 @@ import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -33,6 +34,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -199,6 +201,7 @@ public class TestNiFiOrcUtils {
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L) instanceof LongWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f) instanceof FloatWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0) instanceof DoubleWritable);
+ assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1.0D)) instanceof HiveDecimalWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}) instanceof List);
assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3)) instanceof List);
Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index f726010..8af55d5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hive.ql.io.orc;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -45,6 +47,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.orc.MemoryManager;
@@ -52,6 +55,7 @@ import org.apache.orc.OrcConf;
import org.apache.orc.impl.MemoryManagerImpl;
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
@@ -104,6 +108,9 @@ public class NiFiOrcUtils {
if (o instanceof Double) {
return new DoubleWritable((double) o);
}
+ if (o instanceof BigDecimal) {
+ return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) o));
+ }
if (o instanceof String) {
return new Text(o.toString());
}
@@ -286,6 +293,11 @@ public class NiFiOrcUtils {
|| RecordFieldType.STRING.equals(fieldType)) {
return getPrimitiveOrcTypeFromPrimitiveFieldType(dataType);
}
+
+ if (RecordFieldType.DECIMAL.equals(fieldType)) {
+ DecimalDataType decimalDataType = (DecimalDataType) dataType;
+ return TypeInfoFactory.getDecimalTypeInfo(decimalDataType.getPrecision(), decimalDataType.getScale());
+ }
if (RecordFieldType.DATE.equals(fieldType)) {
return TypeInfoFactory.dateTypeInfo;
}
@@ -407,6 +419,9 @@ public class NiFiOrcUtils {
if (RecordFieldType.FLOAT.equals(dataType)) {
return "FLOAT";
}
+ if (RecordFieldType.DECIMAL.equals(dataType)) {
+ return "DECIMAL";
+ }
if (RecordFieldType.STRING.equals(dataType)) {
return "STRING";
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index a6d0214..b8ed6c2 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -66,6 +67,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
@@ -75,7 +77,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
-import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -126,7 +127,7 @@ public class PutORCTest {
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
- readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ readerFactory.addSchemaField(recordField);
}
if (recordGenerator == null) {
@@ -222,16 +223,14 @@ public class PutORCTest {
public void testWriteORCWithAvroLogicalTypes() throws IOException, InitializationException {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_logical_types.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
- Calendar now = Calendar.getInstance();
LocalTime nowTime = LocalTime.now();
LocalDateTime nowDateTime = LocalDateTime.now();
- LocalDate epoch = LocalDate.ofEpochDay(0);
LocalDate nowDate = LocalDate.now();
final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY);
final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime);
final Date dt = Date.valueOf(nowDate);
- final double dec = 1234.56;
+ final BigDecimal bigDecimal = new BigDecimal("92.12");
configure(proc, 10, (numUsers, readerFactory) -> {
for (int i = 0; i < numUsers; i++) {
@@ -240,7 +239,7 @@ public class PutORCTest {
timeMillis,
timestampMillis,
dt,
- dec);
+ bigDecimal);
}
return null;
});
@@ -265,7 +264,7 @@ public class PutORCTest {
mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
// DDL will be created with field names normalized (lowercased, e.g.) for Hive by default
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
- "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DOUBLE) STORED AS ORC");
+ "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC");
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
@@ -285,7 +284,7 @@ public class PutORCTest {
final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
assertEquals(noTimeOfDayDateFormat.format(dt), ((DateWritableV2) x.get(3)).get().toString());
- assertEquals(dec, ((DoubleWritable) x.get(4)).get(), Double.MIN_VALUE);
+ assertEquals(bigDecimal, ((HiveDecimalWritable) x.get(4)).getHiveDecimal().bigDecimalValue());
return null;
}
);
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index bde6af8..19580ea 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -21,7 +21,9 @@ import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -31,13 +33,16 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
import org.junit.Test;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
@@ -170,6 +175,20 @@ public class TestNiFiOrcUtils {
assertEquals(TypeInfoCreator.createString(), orcType);
}
+
+ @Test
+ public void test_getOrcField_decimal() {
+ // given
+ final DecimalTypeInfo expected = TypeInfoFactory.getDecimalTypeInfo(4, 2);
+ final DataType decimalDataType = RecordFieldType.DECIMAL.getDecimalDataType(4, 2);
+
+ // when
+ final TypeInfo orcField = NiFiOrcUtils.getOrcField(decimalDataType, false);
+
+ // then
+ Assert.assertEquals(expected, orcField);
+ }
+
@Test
public void test_getPrimitiveOrcTypeFromPrimitiveFieldType() {
// Expected ORC types
@@ -205,6 +224,7 @@ public class TestNiFiOrcUtils {
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L, true) instanceof LongWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f, true) instanceof FloatWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0, true) instanceof DoubleWritable);
+ assertTrue(NiFiOrcUtils.convertToORCObject(null, BigDecimal.valueOf(1L), true) instanceof HiveDecimalWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}, true) instanceof List);
assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3), true) instanceof List);
Map<String, Float> map = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index b044f9a..bc59b12 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.kudu;
import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AsyncKuduClient;
@@ -311,12 +312,15 @@ public class KuduLookupService extends AbstractControllerService implements Reco
case INT64:
fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
break;
+ case DECIMAL:
+ final ColumnTypeAttributes attributes = cs.getTypeAttributes();
+ fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())));
+ break;
case UNIXTIME_MICROS:
fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
break;
case BINARY:
case STRING:
- case DECIMAL:
fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
break;
case DOUBLE:
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index c41c8d5..b9639e5 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -18,8 +18,10 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
+import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
@@ -53,6 +55,8 @@ import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
@@ -369,7 +373,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
/**
* Converts a NiFi DataType to it's equivalent Kudu Type.
*/
- protected Type toKuduType(DataType nifiType) {
+ private Type toKuduType(DataType nifiType) {
switch (nifiType.getFieldType()) {
case BOOLEAN:
return Type.BOOL;
@@ -385,6 +389,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
+ case DECIMAL:
+ return Type.DECIMAL;
case TIMESTAMP:
return Type.UNIXTIME_MICROS;
case CHAR:
@@ -395,6 +401,36 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
}
+ private ColumnTypeAttributes getKuduTypeAttributes(final DataType nifiType) {
+ if (nifiType.getFieldType().equals(RecordFieldType.DECIMAL)) {
+ final DecimalDataType decimalDataType = (DecimalDataType) nifiType;
+ return new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(decimalDataType.getPrecision()).scale(decimalDataType.getScale()).build();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Based on NiFi field declaration, generates an alter statement to extend table with new column. Note: simply calling
+ * {@link AlterTableOptions#addNullableColumn(String, Type)} is not sufficient as it does not cover BigDecimal scale and precision handling.
+ *
+ * @param columnName Name of the new table column.
+ * @param nifiType Type of the field.
+ *
+ * @return Alter table statement to extend table with the new field.
+ */
+ protected AlterTableOptions getAddNullableColumnStatement(final String columnName, final DataType nifiType) {
+ final AlterTableOptions alterTable = new AlterTableOptions();
+
+ alterTable.addColumn(new ColumnSchema.ColumnSchemaBuilder(columnName, toKuduType(nifiType))
+ .nullable(true)
+ .defaultValue(null)
+ .typeAttributes(getKuduTypeAttributes(nifiType))
+ .build());
+
+ return alterTable;
+ }
+
private int getColumnIndex(Schema columns, String colName) {
try {
return columns.getColumnIndex(colName);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 2ac0195..c0d7e46 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -18,11 +18,10 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.Schema;
-import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
@@ -325,10 +324,8 @@ public class PutKudu extends AbstractKuduProcessor {
// we created by a concurrent thread or application attempting to handle schema drift.
for (RecordField field : missing) {
try {
- String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
- AlterTableOptions alter = new AlterTableOptions();
- alter.addNullableColumn(columnName, toKuduType(field.getDataType()));
- kuduClient.alterTable(tableName, alter);
+ final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
+ kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
} catch (KuduException e) {
// Ignore the exception if the column already exists due to concurrent
// threads or applications attempting to handle schema drift.
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 52ab00d..4f634fd 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kudu;
+import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
@@ -37,6 +38,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
@@ -47,6 +49,9 @@ public class MockPutKudu extends PutKudu {
private KuduSession session;
private LinkedList<Insert> insertQueue;
+ // Atomic reference is used as the set and use of the schema are in different thread
+ private AtomicReference<Schema> tableSchema = new AtomicReference<>();
+
private boolean loggedIn = false;
private boolean loggedOut = false;
@@ -102,7 +107,9 @@ public class MockPutKudu extends PutKudu {
final KuduClient client = mock(KuduClient.class);
try {
- when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+ final KuduTable kuduTable = mock(KuduTable.class);
+ when(client.openTable(anyString())).thenReturn(kuduTable);
+ when(kuduTable.getSchema()).thenReturn(tableSchema.get());
} catch (final Exception e) {
throw new AssertionError(e);
}
@@ -173,7 +180,11 @@ public class MockPutKudu extends PutKudu {
}
@Override
- protected KuduSession createKuduSession(KuduClient client) {
+ protected KuduSession createKuduSession(final KuduClient client) {
return session;
}
+
+ void setTableSchema(final Schema tableSchema) {
+ this.tableSchema.set(tableSchema);
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 303a03b..97e3e3d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -25,9 +25,9 @@ import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
-import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
@@ -60,6 +60,7 @@ import org.mockito.stubbing.OngoingStubbing;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -85,7 +86,7 @@ public class TestPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
public static final String DEFAULT_MASTERS = "testLocalHost:7051";
public static final String SKIP_HEAD_LINE = "false";
- public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal";
+ public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal,decimalVal";
private TestRunner testRunner;
@@ -122,9 +123,10 @@ public class TestPutKudu {
readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+ readerFactory.addSchemaField(new RecordField("decimalVal", RecordFieldType.DECIMAL.getDecimalDataType(6, 3)));
for (int i=0; i < numOfRecord; i++) {
- readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+ readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i, new BigDecimal(111.111D).add(BigDecimal.valueOf(i)));
}
testRunner.addControllerService("mock-reader-factory", readerFactory);
@@ -260,6 +262,26 @@ public class TestPutKudu {
}
@Test
+ public void testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed() throws InitializationException, IOException {
+ // given
+ processor.setTableSchema(new Schema(Arrays.asList()));
+ createRecordReader(5);
+ final String filename = "testAddingMissingFieldsWhenHandleSchemaDriftIsAllowed-" + System.currentTimeMillis();
+
+ final Map<String,String> flowFileAttributes = new HashMap<>();
+ flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+ testRunner.setProperty(PutKudu.HANDLE_SCHEMA_DRIFT, "true");
+ testRunner.enqueue("trigger", flowFileAttributes);
+
+ // when
+ testRunner.run();
+
+ // then
+ testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+ }
+
+ @Test
public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
createRecordReader(10);
@@ -288,7 +310,7 @@ public class TestPutKudu {
public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
createRecordReader(0);
// add the favorite color as a string
- readerFactory.addRecord(1, "name0", "0", "89.89");
+ readerFactory.addRecord(1, "name0", "0", "89.89", "111.111");
final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
index 3a4bd20..91fd7c2 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java
@@ -207,6 +207,7 @@ public class PrometheusRecordSink extends AbstractControllerService implements R
|| RecordFieldType.BIGINT.equals(dataType)
|| RecordFieldType.FLOAT.equals(dataType)
|| RecordFieldType.DOUBLE.equals(dataType)
+ || RecordFieldType.DECIMAL.equals(dataType)
|| RecordFieldType.BOOLEAN.equals(dataType);
}
diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
index 02a54b9..34bb657 100644
--- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
+++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java
@@ -29,8 +29,8 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.ListRecordSet;
@@ -47,6 +47,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
@@ -71,17 +72,20 @@ public class TestPrometheusRecordSink {
List<RecordField> recordFields = Arrays.asList(
new RecordField("field1", RecordFieldType.INT.getDataType()),
- new RecordField("field2", RecordFieldType.STRING.getDataType())
+ new RecordField("field2", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)),
+ new RecordField("field3", RecordFieldType.STRING.getDataType())
);
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
Map<String, Object> row1 = new HashMap<>();
row1.put("field1", 15);
- row1.put("field2", "Hello");
+ row1.put("field2", BigDecimal.valueOf(12.34567D));
+ row1.put("field3", "Hello");
Map<String, Object> row2 = new HashMap<>();
row2.put("field1", 6);
- row2.put("field2", "World!");
+ row2.put("field2", BigDecimal.valueOf(0.1234567890123456789D));
+ row2.put("field3", "World!");
RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
new MapRecord(recordSchema, row1),
@@ -95,8 +99,10 @@ public class TestPrometheusRecordSink {
assertEquals(2, writeResult.getRecordCount());
assertEquals("Hello", writeResult.getAttributes().get("a"));
+
final String content = getMetrics();
- assertTrue(content.contains("field1{field2=\"Hello\",} 15.0\nfield1{field2=\"World!\",} 6.0\n"));
+ assertTrue(content.contains("field1{field3=\"Hello\",} 15.0\nfield1{field3=\"World!\",} 6.0\n"));
+ assertTrue(content.contains("field2{field3=\"Hello\",} 12.34567\nfield2{field3=\"World!\",} 0.12345678901234568\n"));
try {
sink.onStopped();
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
index 789ab00..55c0de4 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
@@ -122,6 +122,11 @@ public class RecordSinkHandler extends AbstractActionHandlerService{
if (value.contains(".")) {
try {
final double doubleValue = Double.parseDouble(value);
+
+ if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+ return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+ }
+
if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
return RecordFieldType.DOUBLE.getDataType();
}
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
index 8566a2a..bd11534 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestRecordSinkHandler.java
@@ -34,6 +34,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -79,10 +80,12 @@ public class TestRecordSinkHandler {
final Map<String,String> attributes = new HashMap<>();
final Map<String,Object> metrics = new HashMap<>();
final String expectedMessage = "Records written to sink service:";
+ final BigDecimal bigDecimalValue = new BigDecimal(String.join("", Collections.nCopies(400, "1")) + ".2");
attributes.put("sendZeroResults","false");
metrics.put("jvmHeap","1000000");
metrics.put("cpu","90");
+ metrics.put("custom", bigDecimalValue);
final Action action = new Action();
action.setType("SEND");
@@ -96,6 +99,7 @@ public class TestRecordSinkHandler {
Map<String,Object> record = rows.get(0);
assertEquals("90", (record.get("cpu")));
assertEquals("1000000", (record.get("jvmHeap")));
+ assertEquals(bigDecimalValue, (record.get("custom")));
}
private static class MockRecordSinkHandler extends RecordSinkHandler {
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 5f8328f..cf63a70 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -365,6 +365,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
case FLOAT:
case INT:
case BIGINT:
+ case DECIMAL:
case LONG:
case SHORT:
case STRING:
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
index 60fa072..af1f2a4 100644
--- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
@@ -404,7 +404,7 @@ public class SolrUtils {
continue;
}else {
final DataType dataType = schema.getDataType(field.getFieldName()).get();
- writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex);
+ writeValue(inputDocument, value, fieldName, dataType, fieldsToIndex);
}
}
}
@@ -462,6 +462,9 @@ public class SolrUtils {
addFieldToSolrDocument(inputDocument,fieldName, coercedValue,fieldsToIndex);
}
break;
+ case DECIMAL:
+ addFieldToSolrDocument(inputDocument, fieldName, DataTypeUtils.toBigDecimal(coercedValue, fieldName), fieldsToIndex);
+ break;
case BOOLEAN:
final String stringValue = coercedValue.toString();
if ("true".equalsIgnoreCase(stringValue)) {
diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
new file mode 100644
index 0000000..edc678e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/SolrUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SolrUtilsTest {
+
+ @Mock
+ private SolrInputDocument inputDocument;
+
+ @Test
+ public void test() throws Exception {
+ // given
+ final String value = "12345678901234567890.123456789012345678901234567890";
+ final BigDecimal bigDecimalValue = new BigDecimal(value);
+ final List<RecordField> fields = Collections.singletonList(new RecordField("test", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("test", bigDecimalValue);
+
+ final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+ final List<String> fieldsToIndex = Collections.singletonList("parent_test");
+
+ // when
+ SolrUtils.writeRecord(record, inputDocument, fieldsToIndex, "parent");
+
+ // then
+ Mockito.verify(inputDocument, Mockito.times(1)).addField("parent_test", bigDecimalValue);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 18cbc63..4f0fec2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -46,6 +46,7 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import java.lang.reflect.Type;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
@@ -223,6 +224,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran
return typeFactory.createJavaType(HashMap.class);
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
+ case DECIMAL:
+ return typeFactory.createJavaType(BigDecimal.class);
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index d888e3c..f17fe3c 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -133,6 +133,7 @@
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
+ <exclude>src/test/resources/avro/decimals.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
index 746b1ce..f073464 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
@@ -104,6 +104,7 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
case LONG:
case FLOAT:
case DOUBLE:
+ case DECIMAL:
case BYTE:
case CHAR:
case SHORT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index 10b18a9..bc25744 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -89,6 +89,11 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
if (value.contains(".")) {
try {
final double doubleValue = Double.parseDouble(value);
+
+ if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+ return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+ }
+
if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
return RecordFieldType.DOUBLE.getDataType();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 6526ebe..733d3a5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -156,6 +156,7 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
switch (fieldType) {
case BIGINT:
case BYTE:
+ case DECIMAL:
case DOUBLE:
case FLOAT:
case LONG:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
index 02587cc..10b4129 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonSchemaInference.java
@@ -23,7 +23,9 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.DecimalNode;
+import java.math.BigDecimal;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
@@ -62,6 +64,12 @@ public class JsonSchemaInference extends HierarchicalSchemaInference<JsonNode> {
return RecordFieldType.LONG.getDataType();
}
+ if (jsonNode.isBigDecimal()) {
+ final DecimalNode decimalNode = (DecimalNode) jsonNode;
+ final BigDecimal value = decimalNode.getDecimalValue();
+ return RecordFieldType.DECIMAL.getDecimalDataType(value.precision(), value.scale());
+ }
+
if (jsonNode.isFloatingPointNumber()) {
return RecordFieldType.DOUBLE.getDataType();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index d0172e9..469eb80 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -160,6 +160,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
case BOOLEAN:
case BYTE:
case CHAR:
+ case DECIMAL:
case DOUBLE:
case FLOAT:
case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index dd7eaec..80c8512 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -376,6 +376,9 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
case STRING:
generator.writeString(coercedValue.toString());
break;
+ case DECIMAL:
+ generator.writeNumber(DataTypeUtils.toBigDecimal(coercedValue, fieldName));
+ break;
case BIGINT:
if (coercedValue instanceof Long) {
generator.writeNumber(((Long) coercedValue).longValue());
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
index 16a1e9e..8b4710b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java
@@ -250,6 +250,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet
case BOOLEAN:
case BYTE:
case CHAR:
+ case DECIMAL:
case DOUBLE:
case FLOAT:
case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
index 4a2eacb..2cb165b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
@@ -145,6 +145,7 @@ public class XMLRecordReader implements RecordReader {
case BOOLEAN:
case BYTE:
case CHAR:
+ case DECIMAL:
case DOUBLE:
case FLOAT:
case INT:
@@ -528,6 +529,7 @@ public class XMLRecordReader implements RecordReader {
case BOOLEAN:
case BYTE:
case CHAR:
+ case DECIMAL:
case DOUBLE:
case FLOAT:
case INT:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
index 134e9a2..0e2ccab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
@@ -57,6 +57,11 @@ public class XmlSchemaInference extends HierarchicalSchemaInference<XmlNode> {
if (text.contains(".")) {
try {
final double doubleValue = Double.parseDouble(text);
+
+ if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+ return RecordFieldType.DECIMAL.getDecimalDataType(text.length() - 1, text.length() - 1 - text.indexOf("."));
+ }
+
if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
return RecordFieldType.DOUBLE.getDataType();
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index b5fd869..d310d4f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -113,7 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema {
assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType());
assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType());
assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType());
- assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType());
+ assertEquals(RecordFieldType.DECIMAL, recordSchema.getDataType("decimal").get().getFieldType());
final Record record = reader.nextRecord();
assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis"));
@@ -123,7 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema {
final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
- assertEquals(bigDecimal.doubleValue(), record.getValue("decimal"));
+ assertEquals(bigDecimal, record.getValue("decimal"));
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 7a35ba5..a25e80e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -33,6 +33,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -139,6 +140,53 @@ public abstract class TestWriteAvroResult {
}
@Test
+ public void testDecimalType() throws IOException {
+ final Object[][] decimals = new Object[][] {
+ // id, record field, value, expected value
+
+ // Uses the whole precision and scale
+ {1, RecordFieldType.DECIMAL.getDecimalDataType(10, 2), new BigDecimal("12345678.12"), new BigDecimal("12345678.12")},
+
+ // Uses less precision and scale than allowed
+ {2, RecordFieldType.DECIMAL.getDecimalDataType(10, 2), new BigDecimal("123456.1"), new BigDecimal("123456.10")},
+
+ // Record schema uses smaller precision and scale than allowed
+ {3, RecordFieldType.DECIMAL.getDecimalDataType(8, 1), new BigDecimal("123456.1"), new BigDecimal("123456.10")},
+
+ // Record schema uses bigger precision and scale than allowed
+ {4, RecordFieldType.DECIMAL.getDecimalDataType(16, 4), new BigDecimal("123456.1"), new BigDecimal("123456.10")},
+ };
+
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/decimals.avsc"));
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final List<RecordField> fields = new ArrayList<>();
+ final Map<String, Object> values = new HashMap<>();
+
+ for (final Object[] decimal : decimals) {
+ fields.add(new RecordField("decimal" + decimal[0], (DataType) decimal[1]));
+ values.put("decimal" + decimal[0], decimal[2]);
+ }
+
+ final Record record = new MapRecord(new SimpleRecordSchema(fields), values);
+
+ try (final RecordSetWriter writer = createWriter(schema, baos)) {
+ writer.write(RecordSet.of(record.getSchema(), record));
+ }
+
+ final byte[] data = baos.toByteArray();
+
+ try (final InputStream in = new ByteArrayInputStream(data)) {
+ final GenericRecord avroRecord = readRecord(in, schema);
+
+ for (final Object[] decimal : decimals) {
+ final Schema decimalSchema = schema.getField("decimal" + decimal[0]).schema();
+ final LogicalType logicalType = decimalSchema.getLogicalType();
+ Assert.assertEquals(decimal[3], new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal" + decimal[0]), decimalSchema, logicalType));
+ }
+ }
+ }
+
+ @Test
public void testLogicalTypes() throws IOException, ParseException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
testLogicalTypes(schema);
@@ -159,8 +207,7 @@ public abstract class TestWriteAvroResult {
fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
- // Avro decimal is represented as double in NiFi type system.
- fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(5,2)));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final String expectedTime = "2017-04-04 14:20:33.789";
@@ -174,9 +221,7 @@ public abstract class TestWriteAvroResult {
values.put("timestampMillis", new Timestamp(timeLong));
values.put("timestampMicros", new Timestamp(timeLong));
values.put("date", new Date(timeLong));
- // Avro decimal is represented as double in NiFi type system.
- final BigDecimal expectedDecimal = new BigDecimal("123.45");
- values.put("decimal", expectedDecimal.doubleValue());
+ values.put("decimal", new BigDecimal("123.45"));
final Record record = new MapRecord(recordSchema, values);
try (final RecordSetWriter writer = createWriter(schema, baos)) {
@@ -201,7 +246,7 @@ public abstract class TestWriteAvroResult {
// Union type doesn't return logical type. Find the first logical type defined within the union.
: decimalSchema.getTypes().stream().map(s -> s.getLogicalType()).filter(Objects::nonNull).findFirst().get();
final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, logicalType);
- assertEquals(expectedDecimal, decimal);
+ assertEquals(new BigDecimal("123.45"), decimal);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 230a0e3..ca36430 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -36,6 +36,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Time;
@@ -45,6 +46,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
@@ -120,6 +122,26 @@ public class TestCSVRecordReader {
}
@Test
+ public void testBigDecimal() throws IOException, MalformedRecordException {
+ final String value = String.join("", Collections.nCopies(500, "1")) + ".2";
+ final String text = "decimal\n" + value;
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ try (final InputStream bais = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
+ final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+ RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), StandardCharsets.UTF_8.name())) {
+
+ final Record record = reader.nextRecord();
+ final BigDecimal result = (BigDecimal)record.getValue("decimal");
+
+ assertEquals(new BigDecimal(value), result);
+ }
+ }
+
+ @Test
public void testDateNoCoersionExpectedFormat() throws IOException, MalformedRecordException {
final String text = "date\n11/30/1983";
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index bb3d0fe..d5569f0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
@@ -126,6 +127,7 @@ public class TestWriteCSVResult {
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
+ valueMap.put("decimal", BigDecimal.valueOf(8.1D));
valueMap.put("date", new Date(now));
valueMap.put("time", new Time(now));
valueMap.put("timestamp", new Timestamp(now));
@@ -154,7 +156,7 @@ public class TestWriteCSVResult {
final String values = splits[1];
final StringBuilder expectedBuilder = new StringBuilder();
- expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
+ expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
final String expectedValues = expectedBuilder.toString();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 0e50764..82b5b87 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -75,10 +75,10 @@ public class TestJsonSchemaInference {
assertSame(RecordFieldType.LONG, schema.getDataType("setc").get().getFieldType());
assertSame(RecordFieldType.LONG, schema.getDataType("boolc").get().getFieldType());
assertEquals(RecordFieldType.STRING.getDataType(), schema.getDataType("binaryc").get());
+ // We currently do not read BigDecimal from JSON as ObjectMapper in InferenceSchemaStrategy automatically reads it as double
final List<String> fieldNames = schema.getFieldNames();
assertEquals(Arrays.asList("varcharc", "uuid", "tinyintc", "textc", "datec", "smallintc", "mediumintc", "intc", "bigintc",
"floatc", "doublec", "decimalc", "timestampc", "timec", "charc", "tinytextc", "blobc", "mediumtextc", "enumc", "setc", "boolc", "binaryc"), fieldNames);
}
-
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 759a5ff..bea213f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -38,6 +38,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,10 +60,14 @@ public class TestJsonTreeRowRecordReader {
private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
private List<RecordField> getDefaultFields() {
+ return getFields(RecordFieldType.DOUBLE.getDataType());
+ }
+
+ private List<RecordField> getFields(final DataType balanceDataType) {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("balance", balanceDataType));
fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
@@ -249,7 +254,8 @@ public class TestJsonTreeRowRecordReader {
@Test
public void testReadMultilineJSON() throws IOException, MalformedRecordException {
- final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+ final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
@@ -260,14 +266,14 @@ public class TestJsonTreeRowRecordReader {
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
- Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+ Assert.assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
final Object[] secondRecordValues = reader.nextRecord().getValues();
- Assert.assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+ Assert.assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
assertNull(reader.nextRecord());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index b941c09..e8b5cef 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -35,6 +35,7 @@ import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -94,6 +95,7 @@ public class TestWriteJsonResult {
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
+ valueMap.put("decimal", BigDecimal.valueOf(8.1D));
valueMap.put("date", new Date(time));
valueMap.put("time", new Time(time));
valueMap.put("timestamp", new Timestamp(time));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
index caf0038..682abcd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
@@ -144,6 +144,26 @@ public class TestFieldTypeInference {
runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
}
+ @Test
+ public void testToDataTypeWhenDecimal() {
+ // GIVEN
+ List<DataType> dataTypes = Arrays.asList(
+ RecordFieldType.DECIMAL.getDecimalDataType(10, 1),
+ RecordFieldType.DECIMAL.getDecimalDataType(10, 3),
+ RecordFieldType.DECIMAL.getDecimalDataType(7, 3),
+ RecordFieldType.DECIMAL.getDecimalDataType(7, 5),
+ RecordFieldType.DECIMAL.getDecimalDataType(7, 7),
+ RecordFieldType.FLOAT.getDataType(),
+ RecordFieldType.DOUBLE.getDataType()
+ );
+
+ DataType expected = RecordFieldType.DECIMAL.getDecimalDataType(10, 7);
+
+ // WHEN
+ // THEN
+ runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, dataTypes, expected);
+ }
+
private SimpleRecordSchema createRecordSchema(String fieldName, DataType fieldType) {
return new SimpleRecordSchema(Arrays.asList(
new RecordField(fieldName, fieldType)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
index 6ab9229..0cdd217 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestWriteXMLResult.java
@@ -189,6 +189,7 @@ public class TestWriteXMLResult {
valueMap.put("long", 8L);
valueMap.put("float", 8.0F);
valueMap.put("double", 8.0D);
+ valueMap.put("decimal", 8.1D);
valueMap.put("date", new Date(time));
valueMap.put("time", new Time(time));
valueMap.put("timestamp", new Timestamp(time));
@@ -207,8 +208,8 @@ public class TestWriteXMLResult {
writer.flush();
String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><short>8</short>" +
- "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><date>2017-01-01</date>" +
- "<time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
+ "<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><decimal>8.1</decimal>" +
+ "<date>2017-01-01</date><time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
"<map><height>48</height><width>96</width></map></RECORD></ROOT>";
assertThat(xmlResult, CompareMatcher.isSimilarTo(out.toString()).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
index ef3d692..252b572 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordReader.java
@@ -33,6 +33,7 @@ import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -401,6 +402,22 @@ public class TestXMLRecordReader {
}
@Test
+ public void testSimpleRecordWithAttribute6() throws IOException, MalformedRecordException {
+ // given
+ final InputStream is = new FileInputStream("src/test/resources/xml/people2.xml");
+ final List<RecordField> fields = getSimpleRecordFields();
+ fields.add(new RecordField("ID", RecordFieldType.DECIMAL.getDecimalDataType(38, 10)));
+ final XMLRecordReader reader = new XMLRecordReader(is, new SimpleRecordSchema(fields), true,
+ null, "CONTENT", dateFormat, timeFormat, timestampFormat, Mockito.mock(ComponentLog.class));
+
+ // when
+ final Record record = reader.nextRecord(true, false);
+
+ // then
+ assertEquals(BigDecimal.class, record.getValue("ID").getClass());
+ }
+
+ @Test
public void testSimpleRecordWithAttributeCoerceFalseDropFalse() throws IOException, MalformedRecordException {
InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
List<RecordField> fields = getSimpleRecordFields();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
new file mode 100644
index 0000000..7529378
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals.avsc
@@ -0,0 +1,36 @@
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name" : "decimal1", "type": {
+ "type" : "bytes",
+ "logicalType" : "decimal",
+ "precision" : 10,
+ "scale" : 2
+ }
+ }, {
+ "name" : "decimal2", "type": {
+ "type" : "bytes",
+ "logicalType" : "decimal",
+ "precision" : 10,
+ "scale" : 2
+ }
+ }, {
+ "name" : "decimal3", "type": {
+ "type" : "bytes",
+ "logicalType" : "decimal",
+ "precision" : 10,
+ "scale" : 2
+ }
+ }, {
+ "name" : "decimal4", "type": {
+ "type" : "bytes",
+ "logicalType" : "decimal",
+ "precision" : 10,
+ "scale" : 2
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
index 0472512..e6de436 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
@@ -7,6 +7,7 @@
"bigint" : 8,
"float" : 8.0,
"double" : 8.0,
+ "decimal" : 8.1,
"timestamp" : "2017-01-01 17:00:00",
"date" : "2017-01-01",
"time" : "17:00:00",