You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/04/30 15:42:07 UTC

[nifi] branch main updated: NIFI-8320: Fix column mismatch in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3963f66  NIFI-8320: Fix column mismatch in PutDatabaseRecord
3963f66 is described below

commit 3963f66dffcdc0ca8c3c5a35844007c077d77f41
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Apr 22 16:47:16 2021 -0400

    NIFI-8320: Fix column mismatch in PutDatabaseRecord
    
    This closes #5024
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     |  23 ++-
 .../standard/TestPutDatabaseRecord.groovy          | 114 ++++++++++++++
 .../processors/standard/PutDatabaseRecordTest.java | 171 +++++++++++++++++++++
 3 files changed, 304 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 65f0a7e..9b2251f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -691,15 +691,30 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                     final Object[] values = currentRecord.getValues();
                     final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
-                    List<ColumnDescription> columns = tableSchema.getColumnsAsList();
+                    final RecordSchema recordSchema = currentRecord.getSchema();
+                    final Map<String, ColumnDescription> columns = tableSchema.getColumns();
 
                     for (int i = 0; i < fieldIndexes.size(); i++) {
                         final int currentFieldIndex = fieldIndexes.get(i);
                         Object currentValue = values[currentFieldIndex];
                         final DataType dataType = dataTypes.get(currentFieldIndex);
                         final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
-                        final ColumnDescription column = columns.get(currentFieldIndex);
-                        int sqlType = column.dataType;
+                        final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
+                        String columnName = normalizeColumnName(fieldName, settings.translateFieldNames);
+                        int sqlType;
+
+                        final ColumnDescription column = columns.get(columnName);
+                        // 'column' should not be null here as the fieldIndexes should correspond to fields that match table columns, but better to handle just in case
+                        if (column == null) {
+                            if (!settings.ignoreUnmappedFields) {
+                                throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                                        + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", columns.keySet()));
+                            } else {
+                                sqlType = fieldSqlType;
+                            }
+                        } else {
+                            sqlType = column.dataType;
+                        }
 
                         // Convert (if necessary) from field data type to column data type
                         if (fieldSqlType != sqlType) {
@@ -709,7 +724,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                     currentValue = DataTypeUtils.convertType(
                                             currentValue,
                                             targetDataType,
-                                            currentRecord.getSchema().getField(currentFieldIndex).getFieldName());
+                                            fieldName);
                                 }
                             } catch (IllegalTypeConversionException itce) {
                                 // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 67e3e67..e3c8e4c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -302,6 +302,73 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testInsertNonRequiredColumns() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable(createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("dt", RecordFieldType.DATE)
+
+        LocalDate testDate1 = LocalDate.of(2021, 1, 26)
+        Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
+        Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
+        LocalDate testDate2 = LocalDate.of(2021, 7, 26)
+        Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
+        Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
+
+        parser.addRecord(1, 'rec1', nifiDate1)
+        parser.addRecord(2, 'rec2', nifiDate2)
+        parser.addRecord(3, 'rec3', null)
+        parser.addRecord(4, 'rec4', null)
+        parser.addRecord(5, null, null)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        // Zero value because of the constraint
+        assertEquals(0, rs.getInt(3))
+        assertEquals(jdbcDate1, rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(0, rs.getInt(3))
+        assertEquals(jdbcDate2, rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(3, rs.getInt(1))
+        assertEquals('rec3', rs.getString(2))
+        assertEquals(0, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(4, rs.getInt(1))
+        assertEquals('rec4', rs.getString(2))
+        assertEquals(0, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertTrue(rs.next())
+        assertEquals(5, rs.getInt(1))
+        assertNull(rs.getString(2))
+        assertEquals(0, rs.getInt(3))
+        assertNull(rs.getDate(4))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
         recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
@@ -1337,4 +1404,51 @@ class TestPutDatabaseRecord {
         stmt.close()
         conn.close()
     }
+
+    @Test
+    void testInsertWithDifferentColumnOrdering() throws InitializationException, ProcessException, SQLException, IOException {
+        // Manually create and drop the tables and schemas
+        def conn = dbcp.connection
+        def stmt = conn.createStatement()
+        try {
+            stmt.execute('DROP TABLE TEMP')
+        } catch(ex) {
+            // Do nothing, table may not exist
+        }
+        stmt.execute('CREATE TABLE TEMP (id integer primary key, code integer, name long varchar)')
+
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        // change order of columns
+        parser.addRecord('rec1', 1, 101)
+        parser.addRecord('rec2', 2, 102)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'TEMP')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        ResultSet rs = stmt.executeQuery('SELECT * FROM TEMP')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals(101, rs.getInt(2))
+        assertEquals('rec1', rs.getString(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals(102, rs.getInt(2))
+        assertEquals('rec2', rs.getString(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
new file mode 100644
index 0000000..da45890
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.spy;
+
+
+public class PutDatabaseRecordTest {
+
+    private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+    private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+    private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)";
+    private final static String DB_LOCATION = "target/db_pdr";
+
+    TestRunner runner;
+    PutDatabaseRecord processor;
+    DBCPServiceSimpleImpl dbcp;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws IOException {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @AfterClass
+    public static void cleanUpAfterClass() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+        } catch (SQLNonTransientConnectionException ignore) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ignore) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new PutDatabaseRecord();
+        //Mock the DBCP Controller Service so we can control the Results
+        dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
+
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException, SQLException, IOException {
+        // Need to override the @Before method with a new processor that behaves badly
+        processor = new PutDatabaseRecordUnmatchedField();
+        //Mock the DBCP Controller Service so we can control the Results
+        dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
+
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+
+        recreateTable(createPersons);
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("extra", RecordFieldType.STRING);
+        parser.addSchemaField("dt", RecordFieldType.DATE);
+
+        LocalDate testDate1 = LocalDate.of(2021, 1, 26);
+        Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in UTC
+        Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+        LocalDate testDate2 = LocalDate.of(2021, 7, 26);
+        Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in URC
+        Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+
+        parser.addRecord(1, "rec1", "test", nifiDate1);
+        parser.addRecord(2, "rec2", "test", nifiDate2);
+        parser.addRecord(3, "rec3", "test", null);
+        parser.addRecord(4, "rec4", "test", null);
+        parser.addRecord(5, null, null, null);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+        runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR, PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutDatabaseRecord.REL_RETRY, 0);
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
+    }
+
+    private void recreateTable(String createSQL) throws ProcessException, SQLException {
+        try (final Connection conn = dbcp.getConnection();
+            final Statement stmt = conn.createStatement()) {
+            stmt.execute("drop table PERSONS");
+            stmt.execute(createSQL);
+        } catch (SQLException ignore) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
+        @Override
+        SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException, SQLException {
+            return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0,1,2,3));
+        }
+    }
+}
\ No newline at end of file