You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/25 18:40:26 UTC
nifi git commit: NIFI-3958: Decimal logical type with undefined
precision and scale.
Repository: nifi
Updated Branches:
refs/heads/master 36911957d -> 13b59b562
NIFI-3958: Decimal logical type with undefined precision and scale.
- Oracle NUMBER can return 0 precision and -127 or 0 scale with variable scale NUMBER such as ROWNUM or function result
- Added 'Default Decimal Precision' and 'Default Decimal Scale' property to ExecuteSQL and QueryDatabaseTable to apply default precision and scale if those are unknown
- Coerce BigDecimal scale to field schema logical type, so that BigDecimals having different scale can be written
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1851
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/13b59b56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/13b59b56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/13b59b56
Branch: refs/heads/master
Commit: 13b59b56210eff6f06f6cfb3020953666d502604
Parents: 3691195
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu May 25 00:53:09 2017 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu May 25 14:37:17 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 6 +-
.../nifi/processors/standard/ExecuteSQL.java | 11 ++-
.../processors/standard/QueryDatabaseTable.java | 6 ++
.../processors/standard/util/JdbcCommon.java | 88 +++++++++++++++++---
.../standard/util/TestJdbcCommon.java | 36 +++++++-
5 files changed, 129 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 19697d2..52c55fc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -335,8 +335,12 @@ public class AvroTypeUtil {
final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
final BigDecimal decimal;
if (rawValue instanceof BigDecimal) {
- decimal = (BigDecimal) rawValue;
+ final BigDecimal rawDecimal = (BigDecimal) rawValue;
+ final int desiredScale = decimalType.getScale();
+ // If the desired scale is different than this value's coerce scale.
+ decimal = rawDecimal.scale() == desiredScale ? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP);
} else if (rawValue instanceof Double) {
+ // Scale is adjusted based on precision. If double was 123.456 and precision is 5, then decimal would be 123.46.
decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 3ef9107..3f05766 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -55,6 +55,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@@ -125,6 +127,8 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
+ pds.add(DEFAULT_PRECISION);
+ pds.add(DEFAULT_SCALE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -168,6 +172,8 @@ public class ExecuteSQL extends AbstractProcessor {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+ final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
+ final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@@ -200,7 +206,10 @@ public class ExecuteSQL extends AbstractProcessor {
final ResultSet resultSet = st.executeQuery(selectQuery);
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
- .useLogicalTypes(useAvroLogicalTypes).build();
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
+ .build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index dd3ac7b..f23b228 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -68,6 +68,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@@ -160,6 +162,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
+ pds.add(DEFAULT_PRECISION);
+ pds.add(DEFAULT_SCALE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -212,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.maxRows(maxRowsPerFlowFile)
.convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
.useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
+ .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
+ .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
.build();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 8771fe9..97d5cc1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -73,12 +73,19 @@ import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
/**
* JDBC / SQL common functions.
*/
public class JdbcCommon {
+ private static final int MAX_DIGITS_IN_BIGINT = 19;
+ private static final int MAX_DIGITS_IN_INT = 9;
+ // Derived from MySQL default precision.
+ private static final int DEFAULT_PRECISION_VALUE = 10;
+ private static final int DEFAULT_SCALE_VALUE = 0;
+
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
@@ -107,9 +114,36 @@ public class JdbcCommon {
.required(true)
.build();
+ public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
+ .name("dbf-default-precision")
+ .displayName("Default Decimal Precision")
+ .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ + " a specific 'precision' denoting number of available digits is required."
+ + " Generally, precision is defined by column data type definition or database engines default."
+ + " However undefined precision (0) can be returned from some database engines."
+ + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
+ .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
+ .name("dbf-default-scale")
+ .displayName("Default Decimal Scale")
+ .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ + " a specific 'scale' denoting number of available decimal digits is required."
+ + " Generally, scale is defined by column data type definition or database engines default."
+ + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+ + " 'Default Decimal Scale' is used when writing those undefined numbers."
+ + " If a value has more decimals than specified scale, then the value will be rounded-up,"
+ + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
+ .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
- private static final int MAX_DIGITS_IN_BIGINT = 19;
- private static final int MAX_DIGITS_IN_INT = 9;
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames);
@@ -140,12 +174,16 @@ public class JdbcCommon {
private final int maxRows;
private final boolean convertNames;
private final boolean useLogicalTypes;
+ private final int defaultPrecision;
+ private final int defaultScale;
- private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) {
+ private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) {
this.recordName = recordName;
this.maxRows = maxRows;
this.convertNames = convertNames;
this.useLogicalTypes = useLogicalTypes;
+ this.defaultPrecision = defaultPrecision;
+ this.defaultScale = defaultScale;
}
public static Builder builder() {
@@ -157,6 +195,8 @@ public class JdbcCommon {
private int maxRows = 0;
private boolean convertNames = false;
private boolean useLogicalTypes = false;
+ private int defaultPrecision = DEFAULT_PRECISION_VALUE;
+ private int defaultScale = DEFAULT_SCALE_VALUE;
/**
* Specify a priori record name to use if it cannot be determined from the result set.
@@ -181,8 +221,18 @@ public class JdbcCommon {
return this;
}
+ public Builder defaultPrecision(int defaultPrecision) {
+ this.defaultPrecision = defaultPrecision;
+ return this;
+ }
+
+ public Builder defaultScale(int defaultScale) {
+ this.defaultScale = defaultScale;
+ return this;
+ }
+
public AvroConversionOptions build() {
- return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes);
+ return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale);
}
}
}
@@ -455,14 +505,28 @@ public class JdbcCommon {
// Since Avro 1.8, LogicalType is supported.
case DECIMAL:
case NUMERIC:
-
- final LogicalTypes.Decimal decimal = options.useLogicalTypes
- ? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null;
- addNullableField(builder, columnName,
- u -> options.useLogicalTypes
- ? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))
- : u.stringType());
-
+ if (options.useLogicalTypes) {
+ final int decimalPrecision;
+ final int decimalScale;
+ if (meta.getPrecision(i) > 0) {
+ // When database returns a certain precision, we can rely on that.
+ decimalPrecision = meta.getPrecision(i);
+ decimalScale = meta.getScale(i);
+ } else {
+ // If not, use default precision.
+ decimalPrecision = options.defaultPrecision;
+ // Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result.
+ // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead.
+ // Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0.
+ // Default scale is used to preserve decimals in such case.
+ decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale;
+ }
+ final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
+ addNullableField(builder, columnName,
+ u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));
+ } else {
+ addNullableField(builder, columnName, u -> u.stringType());
+ }
break;
case DATE:
http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index 37660d5..830567c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -31,6 +31,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
@@ -55,6 +57,8 @@ import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
@@ -362,15 +366,29 @@ public class TestJdbcCommon {
@Test
public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
- final BigDecimal bigDecimal = new BigDecimal(38D);
+ final BigDecimal bigDecimal = new BigDecimal(12345D);
+ // If db returns a precision, it should be used.
+ testConvertToAvroStreamForBigDecimal(bigDecimal, 38, 10, 38, 0);
+ }
+
+ @Test
+ public void testConvertToAvroStreamForBigDecimalWithUndefinedPrecision() throws SQLException, IOException {
+ final int expectedScale = 3;
+ final int dbPrecision = 0;
+ final BigDecimal bigDecimal = new BigDecimal(new BigInteger("12345"), expectedScale, new MathContext(dbPrecision));
+ // If db doesn't return a precision, default precision should be used.
+ testConvertToAvroStreamForBigDecimal(bigDecimal, dbPrecision, 24, 24, expectedScale);
+ }
+
+ private void testConvertToAvroStreamForBigDecimal(BigDecimal bigDecimal, int dbPrecision, int defaultPrecision, int expectedPrecision, int expectedScale) throws SQLException, IOException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
when(metadata.getColumnName(1)).thenReturn("The.Chairman");
when(metadata.getTableName(1)).thenReturn("1the::table");
- when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision());
- when(metadata.getScale(1)).thenReturn(bigDecimal.scale());
+ when(metadata.getPrecision(1)).thenReturn(dbPrecision);
+ when(metadata.getScale(1)).thenReturn(expectedScale);
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
@@ -388,7 +406,7 @@ public class TestJdbcCommon {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
- .builder().convertNames(true).useLogicalTypes(true).build();
+ .builder().convertNames(true).useLogicalTypes(true).defaultPrecision(defaultPrecision).build();
JdbcCommon.convertToAvroStream(rs, baos, options, null);
final byte[] serializedBytes = baos.toByteArray();
@@ -400,6 +418,16 @@ public class TestJdbcCommon {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(null, null, genericData);
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
+ final Schema generatedUnion = dataFileReader.getSchema().getField("The_Chairman").schema();
+ // null and decimal.
+ assertEquals(2, generatedUnion.getTypes().size());
+ final LogicalType logicalType = generatedUnion.getTypes().get(1).getLogicalType();
+ assertNotNull(logicalType);
+ assertEquals("decimal", logicalType.getName());
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+ assertEquals(expectedPrecision, decimalType.getPrecision());
+ assertEquals(expectedScale, decimalType.getScale());
+
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);