You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/23 15:08:31 UTC
[nifi] branch main updated: NIFI-8237: This closes #4835. Added
missing SQL types to getDataTypeFromSQLTypeValue(), added defensive code
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 200c04c NIFI-8237: This closes #4835. Added missing SQL types to getDataTypeFromSQLTypeValue(), added defensive code
200c04c is described below
commit 200c04c6d04ef56b45e8d85621e834c67f22cea4
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Feb 22 16:10:54 2021 -0500
NIFI-8237: This closes #4835. Added missing SQL types to getDataTypeFromSQLTypeValue(), added defensive code
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../serialization/record/util/DataTypeUtils.java | 7 ++++
.../processors/standard/PutDatabaseRecord.java | 11 ++++--
.../standard/TestPutDatabaseRecord.groovy | 44 +++++++++++++++++++++-
3 files changed, 56 insertions(+), 6 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 377a659..61e50ff 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -75,6 +75,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+
public class DataTypeUtils {
private static final Logger logger = LoggerFactory.getLogger(DataTypeUtils.class);
@@ -1963,6 +1964,12 @@ public class DataTypeUtils {
case Types.SMALLINT:
return RecordFieldType.SHORT.getDataType();
case Types.VARCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.OTHER:
+ case Types.SQLXML:
return RecordFieldType.STRING.getDataType();
case Types.TIME:
return RecordFieldType.TIME.getDataType();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index d007d18..65f0a7e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -704,10 +704,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
// Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) {
try {
- currentValue = DataTypeUtils.convertType(
- currentValue,
- DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType),
- currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+ DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType);
+ if (targetDataType != null) {
+ currentValue = DataTypeUtils.convertType(
+ currentValue,
+ targetDataType,
+ currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+ }
} catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype,
// try with the original object and field datatype
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 7e96d0d..67e3e67 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -45,7 +45,6 @@ import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLDataException
import java.sql.SQLException
-import java.sql.SQLFeatureNotSupportedException
import java.sql.SQLNonTransientConnectionException
import java.sql.Statement
import java.time.LocalDate
@@ -58,7 +57,6 @@ import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertNull
import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail
-
import static org.mockito.ArgumentMatchers.anyMap
import static org.mockito.Mockito.doAnswer
import static org.mockito.Mockito.spy
@@ -1295,6 +1293,48 @@ class TestPutDatabaseRecord {
// A SQLFeatureNotSupportedException exception is expected from Derby when you try to put the data as an ARRAY
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+ }
+
+ @Test
+ void testLongVarchar() throws InitializationException, ProcessException, SQLException, IOException {
+ // Manually create and drop the tables and schemas
+ def conn = dbcp.connection
+ def stmt = conn.createStatement()
+ try {
+ stmt.execute('DROP TABLE TEMP')
+ } catch(ex) {
+ // Do nothing, table may not exist
+ }
+ stmt.execute('CREATE TABLE TEMP (id integer primary key, name long varchar)')
+
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+
+ parser.addRecord(1, 'rec1')
+ parser.addRecord(2, 'rec2')
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
}
}