You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/04/30 15:42:07 UTC
[nifi] branch main updated: NIFI-8320: Fix column mismatch in
PutDatabaseRecord
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3963f66 NIFI-8320: Fix column mismatch in PutDatabaseRecord
3963f66 is described below
commit 3963f66dffcdc0ca8c3c5a35844007c077d77f41
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Apr 22 16:47:16 2021 -0400
NIFI-8320: Fix column mismatch in PutDatabaseRecord
This closes #5024
Signed-off-by: David Handermann <ex...@apache.org>
---
.../processors/standard/PutDatabaseRecord.java | 23 ++-
.../standard/TestPutDatabaseRecord.groovy | 114 ++++++++++++++
.../processors/standard/PutDatabaseRecordTest.java | 171 +++++++++++++++++++++
3 files changed, 304 insertions(+), 4 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 65f0a7e..9b2251f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -691,15 +691,30 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
- List<ColumnDescription> columns = tableSchema.getColumnsAsList();
+ final RecordSchema recordSchema = currentRecord.getSchema();
+ final Map<String, ColumnDescription> columns = tableSchema.getColumns();
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
- final ColumnDescription column = columns.get(currentFieldIndex);
- int sqlType = column.dataType;
+ final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
+ String columnName = normalizeColumnName(fieldName, settings.translateFieldNames);
+ int sqlType;
+
+ final ColumnDescription column = columns.get(columnName);
+ // 'column' should not be null here as the fieldIndexes should correspond to fields that match table columns, but better to handle just in case
+ if (column == null) {
+ if (!settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", columns.keySet()));
+ } else {
+ sqlType = fieldSqlType;
+ }
+ } else {
+ sqlType = column.dataType;
+ }
// Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) {
@@ -709,7 +724,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
currentValue = DataTypeUtils.convertType(
currentValue,
targetDataType,
- currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+ fieldName);
}
} catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype,
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 67e3e67..e3c8e4c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -302,6 +302,73 @@ class TestPutDatabaseRecord {
}
@Test
+ void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable(createPersons)
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("dt", RecordFieldType.DATE)
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+ Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
+ Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+ Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
+ Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+ parser.addRecord(1, 'rec1', nifiDate1)
+ parser.addRecord(2, 'rec2', nifiDate2)
+ parser.addRecord(3, 'rec3', null)
+ parser.addRecord(4, 'rec4', null)
+ parser.addRecord(5, null, null)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ final Connection conn = dbcp.getConnection()
+ final Statement stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ // Zero value because of the constraint
+ assertEquals(0, rs.getInt(3))
+ assertEquals(jdbcDate1, rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(0, rs.getInt(3))
+ assertEquals(jdbcDate2, rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(3, rs.getInt(1))
+ assertEquals('rec3', rs.getString(2))
+ assertEquals(0, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(4, rs.getInt(1))
+ assertEquals('rec4', rs.getString(2))
+ assertEquals(0, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertTrue(rs.next())
+ assertEquals(5, rs.getInt(1))
+ assertNull(rs.getString(2))
+ assertEquals(0, rs.getInt(3))
+ assertNull(rs.getDate(4))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+ @Test
void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable(createPersons)
final MockRecordParser parser = new MockRecordParser()
@@ -1337,4 +1404,51 @@ class TestPutDatabaseRecord {
stmt.close()
conn.close()
}
+
+ @Test
+ void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException, IOException {
+ // Manually create and drop the tables and schemas
+ def conn = dbcp.connection
+ def stmt = conn.createStatement()
+ try {
+ stmt.execute('DROP TABLE TEMP')
+ } catch(ex) {
+ // Do nothing, table may not exist
+ }
+ stmt.execute('CREATE TABLE TEMP (id integer primary key, code integer, name long varchar)')
+
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ // change order of columns
+ parser.addRecord('rec1', 1, 101)
+ parser.addRecord('rec2', 2, 102)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals(101, rs.getInt(2))
+ assertEquals('rec1', rs.getString(3))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals(102, rs.getInt(2))
+ assertEquals('rec2', rs.getString(3))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
new file mode 100644
index 0000000..da45890
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.spy;
+
+
+public class PutDatabaseRecordTest {
+
+ private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
+ " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+ private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
+ " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+ private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
+ " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+ private final static String DB_LOCATION = "target/db_pdr";
+
+ TestRunner runner;
+ PutDatabaseRecord processor;
+ DBCPServiceSimpleImpl dbcp;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws IOException {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ignore) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ @AfterClass
+ public static void cleanUpAfterClass() throws Exception {
+ try {
+ DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+ } catch (SQLNonTransientConnectionException ignore) {
+ // Do nothing, this is what happens at Derby shutdown
+ }
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ignore) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new PutDatabaseRecord();
+ //Mock the DBCP Controller Service so we can control the Results
+ dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
+
+ final Map<String, String> dbcpProperties = new HashMap<>();
+
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+ }
+
+ @Test
+ public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException, SQLException, IOException {
+ // Need to override the @Before method with a new processor that behaves badly
+ processor = new PutDatabaseRecordUnmatchedField();
+ //Mock the DBCP Controller Service so we can control the Results
+ dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
+
+ final Map<String, String> dbcpProperties = new HashMap<>();
+
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+
+ recreateTable(createPersons);
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("extra", RecordFieldType.STRING);
+ parser.addSchemaField("dt", RecordFieldType.DATE);
+
+ LocalDate testDate1 = LocalDate.of(2021, 1, 26);
+ Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in UTC
+ Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+ LocalDate testDate2 = LocalDate.of(2021, 7, 26);
+ Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in URC
+ Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+
+ parser.addRecord(1, "rec1", "test", nifiDate1);
+ parser.addRecord(2, "rec2", "test", nifiDate2);
+ parser.addRecord(3, "rec3", "test", null);
+ parser.addRecord(4, "rec4", "test", null);
+ parser.addRecord(5, null, null, null);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+ runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0);
+ runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
+ }
+
+ private void recreateTable(String createSQL) throws ProcessException, SQLException {
+ try (final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement()) {
+ stmt.execute("drop table PERSONS");
+ stmt.execute(createSQL);
+ } catch (SQLException ignore) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
+ @Override
+ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
+ return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0,1,2,3));
+ }
+ }
+}
\ No newline at end of file