You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/11 01:02:31 UTC

[nifi] branch main updated: NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d08f024  NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()
d08f024 is described below

commit d08f02428d6313b4acbe4b1b43b238a74addec0c
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 10 17:42:49 2021 -0500

    NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../serialization/record/util/DataTypeUtils.java   |  43 ++++++++
 .../processors/standard/PutDatabaseRecord.java     |  27 ++++-
 .../standard/TestPutDatabaseRecord.groovy          | 113 ++++++++++++++++++++-
 3 files changed, 178 insertions(+), 5 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 2bfa6cf..377a659 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1934,6 +1934,49 @@ public class DataTypeUtils {
         }
     }
 
+    /**
+     * Converts the specified java.sql.Types constant field data type (INTEGER = 4, e.g.) into a DataType
+     *
+     * @param sqlType the DataType to be converted
+     * @return the SQL type corresponding to the specified RecordFieldType
+     */
+    public static DataType getDataTypeFromSQLTypeValue(final int sqlType) {
+        switch (sqlType) {
+            case Types.BIGINT:
+                return RecordFieldType.BIGINT.getDataType();
+            case Types.BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case Types.TINYINT:
+                return RecordFieldType.BYTE.getDataType();
+            case Types.CHAR:
+                return RecordFieldType.CHAR.getDataType();
+            case Types.DATE:
+                return RecordFieldType.DATE.getDataType();
+            case Types.DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case Types.FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case Types.NUMERIC:
+                return RecordFieldType.DECIMAL.getDataType();
+            case Types.INTEGER:
+                return RecordFieldType.INT.getDataType();
+            case Types.SMALLINT:
+                return RecordFieldType.SHORT.getDataType();
+            case Types.VARCHAR:
+                return RecordFieldType.STRING.getDataType();
+            case Types.TIME:
+                return RecordFieldType.TIME.getDataType();
+            case Types.TIMESTAMP:
+                return RecordFieldType.TIMESTAMP.getDataType();
+            case Types.ARRAY:
+                return RecordFieldType.ARRAY.getDataType();
+            case Types.STRUCT:
+                return RecordFieldType.RECORD.getDataType();
+            default:
+                return null;
+        }
+    }
+
     public static boolean isScalarValue(final DataType dataType, final Object value) {
         final RecordFieldType fieldType = dataType.getFieldType();
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 5ef883a..d007d18 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -57,6 +57,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -78,6 +79,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
@@ -689,12 +691,29 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                     final Object[] values = currentRecord.getValues();
                     final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
+                    List<ColumnDescription> columns = tableSchema.getColumnsAsList();
 
                     for (int i = 0; i < fieldIndexes.size(); i++) {
                         final int currentFieldIndex = fieldIndexes.get(i);
                         Object currentValue = values[currentFieldIndex];
                         final DataType dataType = dataTypes.get(currentFieldIndex);
-                        final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
+                        final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
+                        final ColumnDescription column = columns.get(currentFieldIndex);
+                        int sqlType = column.dataType;
+
+                        // Convert (if necessary) from field data type to column data type
+                        if (fieldSqlType != sqlType) {
+                            try {
+                                currentValue = DataTypeUtils.convertType(
+                                        currentValue,
+                                        DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType),
+                                        currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+                            } catch (IllegalTypeConversionException itce) {
+                                // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
+                                // try with the original object and field datatype
+                                sqlType = DataTypeUtils.getSQLTypeValue(dataType);
+                            }
+                        }
 
                         if (sqlType == Types.DATE && currentValue instanceof Date) {
                             // convert Date from the internal UTC normalized form to local time zone needed by database drivers
@@ -1266,7 +1285,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
                             final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
-            this.columns = new HashMap<>();
+            this.columns = new LinkedHashMap<>();
             this.primaryKeyColumnNames = primaryKeyColumnNames;
             this.quotedIdentifierString = quotedIdentifierString;
 
@@ -1283,6 +1302,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
             return columns;
         }
 
+        public List<ColumnDescription> getColumnsAsList() {
+            return new ArrayList<>(columns.values());
+        }
+
         public List<String> getRequiredColumnNames() {
             return requiredColumnNames;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d252f5c..7e96d0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -45,6 +45,7 @@ import java.sql.PreparedStatement
 import java.sql.ResultSet
 import java.sql.SQLDataException
 import java.sql.SQLException
+import java.sql.SQLFeatureNotSupportedException
 import java.sql.SQLNonTransientConnectionException
 import java.sql.Statement
 import java.time.LocalDate
@@ -216,21 +217,21 @@ class TestPutDatabaseRecord {
                 generateInsert(schema, 'PERSONS', tableSchema, settings)
                 fail('generateInsert should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
             }
 
             try {
                 generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
                 fail('generateUpdate should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
             }
 
             try {
                 generateDelete(schema, 'PERSONS', tableSchema, settings)
                 fail('generateDelete should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
             }
         }
     }
@@ -1190,4 +1191,110 @@ class TestPutDatabaseRecord {
 
         }
     }
+
+    @Test
+    void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable(createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("dt", RecordFieldType.BIGINT)
+
+        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+        BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+        BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+        parser.addRecord(1, 'rec1', 101, nifiDate1)
+        parser.addRecord(2, 'rec2', 102, nifiDate2)
+        parser.addRecord(3, 'rec3', 103, null)
+        parser.addRecord(4, 'rec4', 104, null)
+        parser.addRecord(5, null, 105, null)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(101, rs.getInt(3))
+        assertEquals(jdbcDate1, rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(102, rs.getInt(3))
+        assertEquals(jdbcDate2, rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(3, rs.getInt(1))
+        assertEquals('rec3', rs.getString(2))
+        assertEquals(103, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(4, rs.getInt(1))
+        assertEquals('rec4', rs.getString(2))
+        assertEquals(104, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(5, rs.getInt(1))
+        assertNull(rs.getString(2))
+        assertEquals(105, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+
+    @Test
+    void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable(createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.STRING)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+        parser.addSchemaField("dt", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType());
+
+        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+        BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+        BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+        parser.addRecord('1', 'rec1', 101, [1.0,2.0])
+        parser.addRecord('2', 'rec2', 102, [3.0,4.0])
+        parser.addRecord('3', 'rec3', 103, null)
+        parser.addRecord('4', 'rec4', 104, null)
+        parser.addRecord('5', null, 105, null)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+
+    }
 }