You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/11 01:02:31 UTC
[nifi] branch main updated: NIFI-8223: This closes #4819. Use
column datatype in PutDatabaseRecord when calling setObject()
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d08f024 NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()
d08f024 is described below
commit d08f02428d6313b4acbe4b1b43b238a74addec0c
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 10 17:42:49 2021 -0500
NIFI-8223: This closes #4819. Use column datatype in PutDatabaseRecord when calling setObject()
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../serialization/record/util/DataTypeUtils.java | 43 ++++++++
.../processors/standard/PutDatabaseRecord.java | 27 ++++-
.../standard/TestPutDatabaseRecord.groovy | 113 ++++++++++++++++++++-
3 files changed, 178 insertions(+), 5 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 2bfa6cf..377a659 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1934,6 +1934,49 @@ public class DataTypeUtils {
}
}
+ /**
+ * Converts the specified java.sql.Types constant field data type (INTEGER = 4, e.g.) into a DataType
+ *
+ * @param sqlType the DataType to be converted
+ * @return the SQL type corresponding to the specified RecordFieldType
+ */
+ public static DataType getDataTypeFromSQLTypeValue(final int sqlType) {
+ switch (sqlType) {
+ case Types.BIGINT:
+ return RecordFieldType.BIGINT.getDataType();
+ case Types.BOOLEAN:
+ return RecordFieldType.BOOLEAN.getDataType();
+ case Types.TINYINT:
+ return RecordFieldType.BYTE.getDataType();
+ case Types.CHAR:
+ return RecordFieldType.CHAR.getDataType();
+ case Types.DATE:
+ return RecordFieldType.DATE.getDataType();
+ case Types.DOUBLE:
+ return RecordFieldType.DOUBLE.getDataType();
+ case Types.FLOAT:
+ return RecordFieldType.FLOAT.getDataType();
+ case Types.NUMERIC:
+ return RecordFieldType.DECIMAL.getDataType();
+ case Types.INTEGER:
+ return RecordFieldType.INT.getDataType();
+ case Types.SMALLINT:
+ return RecordFieldType.SHORT.getDataType();
+ case Types.VARCHAR:
+ return RecordFieldType.STRING.getDataType();
+ case Types.TIME:
+ return RecordFieldType.TIME.getDataType();
+ case Types.TIMESTAMP:
+ return RecordFieldType.TIMESTAMP.getDataType();
+ case Types.ARRAY:
+ return RecordFieldType.ARRAY.getDataType();
+ case Types.STRUCT:
+ return RecordFieldType.RECORD.getDataType();
+ default:
+ return null;
+ }
+ }
+
public static boolean isScalarValue(final DataType dataType, final Object value) {
final RecordFieldType fieldType = dataType.getFieldType();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 5ef883a..d007d18 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -57,6 +57,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.io.IOException;
import java.io.InputStream;
@@ -78,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
@@ -689,12 +691,29 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
+ List<ColumnDescription> columns = tableSchema.getColumnsAsList();
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
- final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
+ final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
+ final ColumnDescription column = columns.get(currentFieldIndex);
+ int sqlType = column.dataType;
+
+ // Convert (if necessary) from field data type to column data type
+ if (fieldSqlType != sqlType) {
+ try {
+ currentValue = DataTypeUtils.convertType(
+ currentValue,
+ DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType),
+ currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+ } catch (IllegalTypeConversionException itce) {
+ // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
+ // try with the original object and field datatype
+ sqlType = DataTypeUtils.getSQLTypeValue(dataType);
+ }
+ }
if (sqlType == Types.DATE && currentValue instanceof Date) {
// convert Date from the internal UTC normalized form to local time zone needed by database drivers
@@ -1266,7 +1285,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
private TableSchema(final List<ColumnDescription> columnDescriptions, final boolean translateColumnNames,
final Set<String> primaryKeyColumnNames, final String quotedIdentifierString) {
- this.columns = new HashMap<>();
+ this.columns = new LinkedHashMap<>();
this.primaryKeyColumnNames = primaryKeyColumnNames;
this.quotedIdentifierString = quotedIdentifierString;
@@ -1283,6 +1302,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
return columns;
}
+ public List<ColumnDescription> getColumnsAsList() {
+ return new ArrayList<>(columns.values());
+ }
+
public List<String> getRequiredColumnNames() {
return requiredColumnNames;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index d252f5c..7e96d0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -45,6 +45,7 @@ import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLDataException
import java.sql.SQLException
+import java.sql.SQLFeatureNotSupportedException
import java.sql.SQLNonTransientConnectionException
import java.sql.Statement
import java.time.LocalDate
@@ -216,21 +217,21 @@ class TestPutDatabaseRecord {
generateInsert(schema, 'PERSONS', tableSchema, settings)
fail('generateInsert should fail with unmatched fields')
} catch (SQLDataException e) {
- assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+ assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
try {
generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
fail('generateUpdate should fail with unmatched fields')
} catch (SQLDataException e) {
- assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+ assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
try {
generateDelete(schema, 'PERSONS', tableSchema, settings)
fail('generateDelete should fail with unmatched fields')
} catch (SQLDataException e) {
- assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
+ assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: id,name,code", e.getMessage())
}
}
}
@@ -1190,4 +1191,110 @@ class TestPutDatabaseRecord {
}
}
+
+ @Test
+ void testInsertMismatchedCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable(createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+ parser.addSchemaField("dt", RecordFieldType.BIGINT)
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+ BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+ Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+ BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+ Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+ parser.addRecord(1, 'rec1', 101, nifiDate1)
+ parser.addRecord(2, 'rec2', 102, nifiDate2)
+ parser.addRecord(3, 'rec3', 103, null)
+ parser.addRecord(4, 'rec4', 104, null)
+ parser.addRecord(5, null, 105, null)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ final Connection conn = dbcp.getConnection()
+ final Statement stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(101, rs.getInt(3))
+ assertEquals(jdbcDate1, rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(102, rs.getInt(3))
+ assertEquals(jdbcDate2, rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(3, rs.getInt(1))
+ assertEquals('rec3', rs.getString(2))
+ assertEquals(103, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(4, rs.getInt(1))
+ assertEquals('rec4', rs.getString(2))
+ assertEquals(104, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(5, rs.getInt(1))
+ assertNull(rs.getString(2))
+ assertEquals(105, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+
+ @Test
+ void testInsertMismatchedNotCompatibleDataTypes() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable(createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.STRING)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+ parser.addSchemaField("dt", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.FLOAT.getDataType()).getFieldType());
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+ BigInteger nifiDate1 = testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+ Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+ BigInteger nifiDate2 = testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli() // in UTC
+ Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+ parser.addRecord('1', 'rec1', 101, [1.0,2.0])
+ parser.addRecord('2', 'rec2', 102, [3.0,4.0])
+ parser.addRecord('3', 'rec3', 103, null)
+ parser.addRecord('4', 'rec4', 104, null)
+ parser.addRecord('5', null, 105, null)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ // A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+ runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+
+ }
}