You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/25 18:40:26 UTC

nifi git commit: NIFI-3958: Decimal logical type with undefined precision and scale.

Repository: nifi
Updated Branches:
  refs/heads/master 36911957d -> 13b59b562


NIFI-3958: Decimal logical type with undefined precision and scale.

- Oracle NUMBER can return 0 precision and -127 or 0 scale with variable scale NUMBER such as ROWNUM or function result
- Added 'Default Decimal Precision' and 'Default Decimal Scale' property to ExecuteSQL and QueryDatabaseTable to apply default precision and scale if those are unknown
- Coerce BigDecimal scale to field schema logical type, so that BigDecimals having different scale can be written

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1851


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/13b59b56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/13b59b56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/13b59b56

Branch: refs/heads/master
Commit: 13b59b56210eff6f06f6cfb3020953666d502604
Parents: 3691195
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu May 25 00:53:09 2017 +0900
Committer: Matt Burgess <ma...@apache.org>
Committed: Thu May 25 14:37:17 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  6 +-
 .../nifi/processors/standard/ExecuteSQL.java    | 11 ++-
 .../processors/standard/QueryDatabaseTable.java |  6 ++
 .../processors/standard/util/JdbcCommon.java    | 88 +++++++++++++++++---
 .../standard/util/TestJdbcCommon.java           | 36 +++++++-
 5 files changed, 129 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 19697d2..52c55fc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -335,8 +335,12 @@ public class AvroTypeUtil {
                     final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
                     final BigDecimal decimal;
                     if (rawValue instanceof BigDecimal) {
-                        decimal = (BigDecimal) rawValue;
+                        final BigDecimal rawDecimal = (BigDecimal) rawValue;
+                        final int desiredScale = decimalType.getScale();
+                        // If the desired scale is different than this value's coerce scale.
+                        decimal = rawDecimal.scale() == desiredScale ? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP);
                     } else if (rawValue instanceof Double) {
+                        // Scale is adjusted based on precision. If double was 123.456 and precision is 5, then decimal would be 123.46.
                         decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision()));
                     } else {
                         throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");

http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 3ef9107..3f05766 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -55,6 +55,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
 
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
 
@@ -125,6 +127,8 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(DEFAULT_PRECISION);
+        pds.add(DEFAULT_SCALE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -168,6 +172,8 @@ public class ExecuteSQL extends AbstractProcessor {
         final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
         final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
         final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
+        final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
         final StopWatch stopWatch = new StopWatch(true);
         final String selectQuery;
         if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@@ -200,7 +206,10 @@ public class ExecuteSQL extends AbstractProcessor {
                         final ResultSet resultSet = st.executeQuery(selectQuery);
                         final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
                                 .convertNames(convertNamesForAvro)
-                                .useLogicalTypes(useAvroLogicalTypes).build();
+                                .useLogicalTypes(useAvroLogicalTypes)
+                                .defaultPrecision(defaultPrecision)
+                                .defaultScale(defaultScale)
+                                .build();
                         nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
                     } catch (final SQLException e) {
                         throw new ProcessException(e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index dd3ac7b..f23b228 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -68,6 +68,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
 import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
 
@@ -160,6 +162,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         pds.add(MAX_FRAGMENTS);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(DEFAULT_PRECISION);
+        pds.add(DEFAULT_SCALE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -212,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                 .maxRows(maxRowsPerFlowFile)
                 .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
                 .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
+                .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
+                .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
                 .build();
 
         final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());

http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 8771fe9..97d5cc1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -73,12 +73,19 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
 
 /**
  * JDBC / SQL common functions.
  */
 public class JdbcCommon {
 
+    private static final int MAX_DIGITS_IN_BIGINT = 19;
+    private static final int MAX_DIGITS_IN_INT = 9;
+    // Derived from MySQL default precision.
+    private static final int DEFAULT_PRECISION_VALUE = 10;
+    private static final int DEFAULT_SCALE_VALUE = 0;
+
     public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
 
     public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
@@ -107,9 +114,36 @@ public class JdbcCommon {
             .required(true)
             .build();
 
+    public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
+            .name("dbf-default-precision")
+            .displayName("Default Decimal Precision")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'precision' denoting number of available digits is required."
+                    + " Generally, precision is defined by column data type definition or database engines default."
+                    + " However undefined precision (0) can be returned from some database engines."
+                    + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
+            .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
+            .name("dbf-default-scale")
+            .displayName("Default Decimal Scale")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'scale' denoting number of available decimal digits is required."
+                    + " Generally, scale is defined by column data type definition or database engines default."
+                    + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+                    + " 'Default Decimal Scale' is used when writing those undefined numbers."
+                    + " If a value has more decimals than specified scale, then the value will be rounded-up,"
+                    + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
+            .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
 
-    private static final int MAX_DIGITS_IN_BIGINT = 19;
-    private static final int MAX_DIGITS_IN_INT = 9;
 
     public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
         return convertToAvroStream(rs, outStream, null, null, convertNames);
@@ -140,12 +174,16 @@ public class JdbcCommon {
         private final int maxRows;
         private final boolean convertNames;
         private final boolean useLogicalTypes;
+        private final int defaultPrecision;
+        private final int defaultScale;
 
-        private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) {
+        private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) {
             this.recordName = recordName;
             this.maxRows = maxRows;
             this.convertNames = convertNames;
             this.useLogicalTypes = useLogicalTypes;
+            this.defaultPrecision = defaultPrecision;
+            this.defaultScale = defaultScale;
         }
 
         public static Builder builder() {
@@ -157,6 +195,8 @@ public class JdbcCommon {
             private int maxRows = 0;
             private boolean convertNames = false;
             private boolean useLogicalTypes = false;
+            private int defaultPrecision = DEFAULT_PRECISION_VALUE;
+            private int defaultScale = DEFAULT_SCALE_VALUE;
 
             /**
              * Specify a priori record name to use if it cannot be determined from the result set.
@@ -181,8 +221,18 @@ public class JdbcCommon {
                 return this;
             }
 
+            public Builder defaultPrecision(int defaultPrecision) {
+                this.defaultPrecision = defaultPrecision;
+                return this;
+            }
+
+            public Builder defaultScale(int defaultScale) {
+                this.defaultScale = defaultScale;
+                return this;
+            }
+
             public AvroConversionOptions build() {
-                return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes);
+                return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale);
             }
         }
     }
@@ -455,14 +505,28 @@ public class JdbcCommon {
                 // Since Avro 1.8, LogicalType is supported.
                 case DECIMAL:
                 case NUMERIC:
-
-                    final LogicalTypes.Decimal decimal = options.useLogicalTypes
-                            ? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null;
-                    addNullableField(builder, columnName,
-                            u -> options.useLogicalTypes
-                                    ? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))
-                                    : u.stringType());
-
+                    if (options.useLogicalTypes) {
+                        final int decimalPrecision;
+                        final int decimalScale;
+                        if (meta.getPrecision(i) > 0) {
+                            // When database returns a certain precision, we can rely on that.
+                            decimalPrecision = meta.getPrecision(i);
+                            decimalScale = meta.getScale(i);
+                        } else {
+                            // If not, use default precision.
+                            decimalPrecision = options.defaultPrecision;
+                            // Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result.
+                            // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead.
+                            // Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0.
+                            // Default scale is used to preserve decimals in such case.
+                            decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale;
+                        }
+                        final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
+                        addNullableField(builder, columnName,
+                                u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));
+                    } else {
+                        addNullableField(builder, columnName, u -> u.stringType());
+                    }
                     break;
 
                 case DATE:

http://git-wip-us.apache.org/repos/asf/nifi/blob/13b59b56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index 37660d5..830567c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -31,6 +31,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.sql.Connection;
@@ -55,6 +57,8 @@ import java.util.function.BiFunction;
 import java.util.stream.IntStream;
 
 import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
@@ -362,15 +366,29 @@ public class TestJdbcCommon {
 
     @Test
     public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
-        final BigDecimal bigDecimal = new BigDecimal(38D);
+        final BigDecimal bigDecimal = new BigDecimal(12345D);
+        // If db returns a precision, it should be used.
+        testConvertToAvroStreamForBigDecimal(bigDecimal, 38, 10, 38, 0);
+    }
+
+    @Test
+    public void testConvertToAvroStreamForBigDecimalWithUndefinedPrecision() throws SQLException, IOException {
+        final int expectedScale = 3;
+        final int dbPrecision = 0;
+        final BigDecimal bigDecimal = new BigDecimal(new BigInteger("12345"), expectedScale, new MathContext(dbPrecision));
+        // If db doesn't return a precision, default precision should be used.
+        testConvertToAvroStreamForBigDecimal(bigDecimal, dbPrecision, 24, 24, expectedScale);
+    }
+
+    private void testConvertToAvroStreamForBigDecimal(BigDecimal bigDecimal, int dbPrecision, int defaultPrecision, int expectedPrecision, int expectedScale) throws SQLException, IOException {
 
         final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
         when(metadata.getColumnCount()).thenReturn(1);
         when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
         when(metadata.getColumnName(1)).thenReturn("The.Chairman");
         when(metadata.getTableName(1)).thenReturn("1the::table");
-        when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision());
-        when(metadata.getScale(1)).thenReturn(bigDecimal.scale());
+        when(metadata.getPrecision(1)).thenReturn(dbPrecision);
+        when(metadata.getScale(1)).thenReturn(expectedScale);
 
         final ResultSet rs = mock(ResultSet.class);
         when(rs.getMetaData()).thenReturn(metadata);
@@ -388,7 +406,7 @@ public class TestJdbcCommon {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
-                .builder().convertNames(true).useLogicalTypes(true).build();
+                .builder().convertNames(true).useLogicalTypes(true).defaultPrecision(defaultPrecision).build();
         JdbcCommon.convertToAvroStream(rs, baos, options, null);
 
         final byte[] serializedBytes = baos.toByteArray();
@@ -400,6 +418,16 @@ public class TestJdbcCommon {
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(null, null, genericData);
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
+            final Schema generatedUnion = dataFileReader.getSchema().getField("The_Chairman").schema();
+            // null and decimal.
+            assertEquals(2, generatedUnion.getTypes().size());
+            final LogicalType logicalType = generatedUnion.getTypes().get(1).getLogicalType();
+            assertNotNull(logicalType);
+            assertEquals("decimal", logicalType.getName());
+            LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+            assertEquals(expectedPrecision, decimalType.getPrecision());
+            assertEquals(expectedScale, decimalType.getScale());
+
             GenericRecord record = null;
             while (dataFileReader.hasNext()) {
                 record = dataFileReader.next(record);