You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/05 20:01:11 UTC

[nifi] branch main updated: NIFI-8172: Provide schema name to getPrimaryKeys call in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b77dbd5  NIFI-8172: Provide schema name to getPrimaryKeys call in PutDatabaseRecord
b77dbd5 is described below

commit b77dbd503099addb760e2d1f4b1249a90839cd97
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jan 29 15:01:13 2021 -0500

    NIFI-8172: Provide schema name to getPrimaryKeys call in PutDatabaseRecord
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4782.
---
 .../processors/standard/PutDatabaseRecord.java     |  53 +++++--
 .../standard/TestPutDatabaseRecord.groovy          | 163 +++++++++++++++++----
 2 files changed, 171 insertions(+), 45 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index f3dd970..5ef883a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -207,7 +207,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor CATALOG_NAME = new Builder()
             .name("put-db-record-catalog-name")
             .displayName("Catalog Name")
-            .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty")
+            .description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.")
             .required(false)
             .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -216,7 +217,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor SCHEMA_NAME = new Builder()
             .name("put-db-record-schema-name")
             .displayName("Schema Name")
-            .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty")
+            .description("The name of the schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the "
+                    + "property is set and the database is case-sensitive, the schema name must match the database's schema name exactly.")
             .required(false)
             .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -225,7 +227,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor TABLE_NAME = new Builder()
             .name("put-db-record-table-name")
             .displayName("Table Name")
-            .description("The name of the table that the statement should affect.")
+            .description("The name of the table that the statement should affect. Note that if the database is case-sensitive, the table name must match the database's table name exactly.")
             .required(true)
             .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -887,7 +889,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
-                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
 
                 if (desc != null) {
@@ -903,6 +906,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         sqlBuilder.append(desc.getColumnName());
                     }
                     includedColumns.add(i);
+                } else {
+                    // User is ignoring unmapped fields, but log at debug level just in case
+                    getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
             }
 
@@ -912,7 +919,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
             sqlBuilder.append(")");
 
             if (fieldsFound.get() == 0) {
-                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n"
+                        + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
             }
         }
         return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
@@ -940,7 +948,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
-                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
 
                 if (desc != null) {
@@ -950,6 +959,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         usedColumnNames.add(desc.getColumnName());
                     }
                     usedColumnIndices.add(i);
+                } else {
+                    // User is ignoring unmapped fields, but log at debug level just in case
+                    getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
             }
         }
@@ -981,7 +994,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
-                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
 
                 if (desc != null) {
@@ -991,6 +1005,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         usedColumnNames.add(desc.getColumnName());
                     }
                     usedColumnIndices.add(i);
+                } else {
+                    // User is ignoring unmapped fields, but log at debug level just in case
+                    getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
             }
         }
@@ -1029,8 +1047,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null) {
                     if (!settings.ignoreUnmappedFields) {
-                        throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
+                        throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                                + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                     } else {
+                        // User is ignoring unmapped fields, but log at debug level just in case
+                        getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+                                + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                         continue;
                     }
                 }
@@ -1127,7 +1149,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null && !settings.ignoreUnmappedFields) {
-                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
 
                 if (desc != null) {
@@ -1150,12 +1173,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     sqlBuilder.append(columnName);
                     sqlBuilder.append(" is null AND ? is null))");
                     includedColumns.add(i);
-
+                } else {
+                    // User is ignoring unmapped fields, but log at debug level just in case
+                    getLogger().debug("Did not map field '" + fieldName + "' to any column in the database\n"
+                            + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
                 }
             }
 
             if (fieldsFound.get() == 0) {
-                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table\n"
+                        + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", tableSchema.getColumns().keySet()));
             }
         }
 
@@ -1192,7 +1219,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         if (updateKeyColumnNames.isEmpty()) {
-            throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+            throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' not found or does not have a Primary Key and no Update Keys were specified");
         }
 
         return updateKeyColumnNames;
@@ -1281,7 +1308,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final Set<String> primaryKeyColumns = new HashSet<>();
                 if (includePrimaryKeys) {
-                    try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) {
+                    try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, schema, tableName)) {
 
                         while (pkrs.next()) {
                             final String colName = pkrs.getString("COLUMN_NAME");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 558eb2f..d252f5c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -48,7 +48,6 @@ import java.sql.SQLException
 import java.sql.SQLNonTransientConnectionException
 import java.sql.Statement
 import java.time.LocalDate
-import java.time.ZoneId
 import java.time.ZoneOffset
 import java.util.function.Supplier
 
@@ -72,8 +71,11 @@ import static org.mockito.Mockito.verify
 class TestPutDatabaseRecord {
 
     private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
-            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000)," +
-            " dt date)"
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
+    private static final String createPersonsSchema1 = "CREATE TABLE SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
+    private static final String createPersonsSchema2 = "CREATE TABLE SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
+            " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000), dt date)"
     private final static String DB_LOCATION = "target/db_pdr"
 
     TestRunner runner
@@ -214,28 +216,28 @@ class TestPutDatabaseRecord {
                 generateInsert(schema, 'PERSONS', tableSchema, settings)
                 fail('generateInsert should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
             }
 
             try {
                 generateUpdate(schema, 'PERSONS', null, tableSchema, settings)
                 fail('generateUpdate should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
             }
 
             try {
                 generateDelete(schema, 'PERSONS', tableSchema, settings)
                 fail('generateDelete should fail with unmatched fields')
             } catch (SQLDataException e) {
-                assertEquals("Cannot map field 'non_existing' to any column in the database", e.getMessage())
+                assertEquals("Cannot map field 'non_existing' to any column in the database\nColumns: code,name,id", e.getMessage())
             }
         }
     }
 
     @Test
     void testInsert() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -302,7 +304,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testInsertBatchUpdateException() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -336,7 +338,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testInsertBatchUpdateExceptionRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -369,8 +371,8 @@ class TestPutDatabaseRecord {
     }
 
     @Test
-    void testInsertNoTable() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+    void testInsertNoTableSpecified() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -393,8 +395,32 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testInsertNoTableExists() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable(createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 101)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS2')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+    }
+
+    @Test
     void testInsertViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -434,7 +460,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -474,7 +500,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -510,7 +536,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testInvalidData() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -545,7 +571,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testIOExceptionOnReadData() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -580,7 +606,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -605,7 +631,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testSqlStatementTypeNoValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -632,7 +658,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testUpdate() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -676,8 +702,81 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testUpdateMultipleSchemas() throws InitializationException, ProcessException, SQLException, IOException {
+        // Manually create and drop the tables and schemas
+        def conn = dbcp.connection
+        def stmt = conn.createStatement()
+        stmt.execute('create schema SCHEMA1')
+        stmt.execute('create schema SCHEMA2')
+        stmt.execute(createPersonsSchema1)
+        stmt.execute(createPersonsSchema2)
+
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 201)
+        parser.addRecord(2, 'rec2', 202)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.UPDATE_TYPE)
+        runner.setProperty(PutDatabaseRecord.SCHEMA_NAME, "SCHEMA1")
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        // Set some existing records with different values for name and code
+        Exception e
+        ResultSet rs
+        try {
+            stmt.execute('''INSERT INTO SCHEMA1.PERSONS VALUES (1,'x1',101,null)''')
+            stmt.execute('''INSERT INTO SCHEMA2.PERSONS VALUES (2,'x2',102,null)''')
+
+            runner.enqueue(new byte[0])
+            runner.run()
+
+            runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+            rs = stmt.executeQuery('SELECT * FROM SCHEMA1.PERSONS')
+            assertTrue(rs.next())
+            assertEquals(1, rs.getInt(1))
+            assertEquals('rec1', rs.getString(2))
+            assertEquals(201, rs.getInt(3))
+            assertFalse(rs.next())
+            rs = stmt.executeQuery('SELECT * FROM SCHEMA2.PERSONS')
+            assertTrue(rs.next())
+            assertEquals(2, rs.getInt(1))
+            // Values should not have been updated
+            assertEquals('x2', rs.getString(2))
+            assertEquals(102, rs.getInt(3))
+            assertFalse(rs.next())
+        } catch(ex) {
+            e = ex
+        }
+
+        // Drop the schemas here so as not to interfere with other tests
+        stmt.execute("drop table SCHEMA1.PERSONS")
+        stmt.execute("drop table SCHEMA2.PERSONS")
+        stmt.execute("drop schema SCHEMA1 RESTRICT")
+        stmt.execute("drop schema SCHEMA2 RESTRICT")
+        stmt.close()
+
+        // Don't proceed if there was a problem with the asserts
+        if(e) throw e
+        rs = conn.metaData.schemas
+        List<String> schemas = new ArrayList<>()
+        while(rs.next()) {
+            schemas += rs.getString(1)
+        }
+        assertFalse(schemas.contains('SCHEMA1'))
+        assertFalse(schemas.contains('SCHEMA2'))
+        conn.close()
+    }
+
+    @Test
     void testUpdateAfterInsert() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -699,7 +798,6 @@ class TestPutDatabaseRecord {
         runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
         final Connection conn = dbcp.getConnection()
         Statement stmt = conn.createStatement()
-        stmt = conn.createStatement()
         ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
         assertTrue(rs.next())
         assertEquals(1, rs.getInt(1))
@@ -737,7 +835,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testUpdateNoPrimaryKeys() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
+        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -753,12 +851,12 @@ class TestPutDatabaseRecord {
         runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
         runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDatabaseRecord.REL_FAILURE).get(0)
-        assertEquals('Table \'PERSONS\' does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR))
+        assertEquals('Table \'PERSONS\' not found or does not have a Primary Key and no Update Keys were specified', flowFile.getAttribute(PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR))
     }
 
     @Test
     void testUpdateSpecifyUpdateKeys() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
+        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -804,7 +902,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testDelete() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         Connection conn = dbcp.getConnection()
         Statement stmt = conn.createStatement()
         stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
@@ -848,7 +946,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         Connection conn = dbcp.getConnection()
         Statement stmt = conn.createStatement()
         stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
@@ -892,7 +990,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testRecordPathOptions() {
-        recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
+        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), code integer)')
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -944,7 +1042,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -977,7 +1075,7 @@ class TestPutDatabaseRecord {
 
     @Test
     void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException {
-        recreateTable("PERSONS", createPersons)
+        recreateTable(createPersons)
         final MockRecordParser parser = new MockRecordParser()
         runner.addControllerService("parser", parser)
         runner.enableControllerService(parser)
@@ -1036,18 +1134,19 @@ class TestPutDatabaseRecord {
         }
     }
 
-    private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
+    private void recreateTable(String createSQL) throws ProcessException, SQLException {
         final Connection conn = dbcp.getConnection()
         final Statement stmt = conn.createStatement()
         try {
-            stmt.executeUpdate("drop table " + tableName)
+            stmt.execute("drop table PERSONS")
         } catch (SQLException ignore) {
             // Do nothing, may not have existed
         }
-        stmt.executeUpdate(createSQL)
+        stmt.execute(createSQL)
         stmt.close()
         conn.close()
     }
+
     @Test
     void testGenerateTableName() throws Exception {