You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/28 02:35:44 UTC

nifi git commit: NIFI-3742: Fixed handling of nulls in DELETE for PutDatabaseRecord

Repository: nifi
Updated Branches:
  refs/heads/master 8651d7977 -> d61f51932


NIFI-3742: Fixed handling of nulls in DELETE for PutDatabaseRecord

This closes #1709.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d61f5193
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d61f5193
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d61f5193

Branch: refs/heads/master
Commit: d61f5193261844661aedec68152db6fa78787a31
Parents: 8651d79
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Apr 27 11:36:47 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Apr 28 11:34:43 2017 +0900

----------------------------------------------------------------------
 .../processors/standard/PutDatabaseRecord.java  | 32 ++++++++++---
 .../standard/TestPutDatabaseRecord.groovy       | 47 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d61f5193/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 0f65014..a591122 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -646,12 +646,24 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
                 if (values != null) {
                     if (fieldIndexes != null) {
                         for (int i = 0; i < fieldIndexes.size(); i++) {
-                            ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+                            // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+                            if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+                                ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]);
+                                ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]);
+                            } else {
+                                ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+                            }
                         }
                     } else {
                         // If there's no index map, assume all values are included and set them in order
                         for (int i = 0; i < values.length; i++) {
-                            ps.setObject(i + 1, values[i]);
+                            // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+                            if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+                                ps.setObject(i * 2 + 1, values[i]);
+                                ps.setObject(i * 2 + 2, values[i]);
+                            } else {
+                                ps.setObject(i + 1, values[i]);
+                            }
                         }
                     }
                     ps.addBatch();
@@ -935,14 +947,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
                         sqlBuilder.append(" AND ");
                     }
 
+                    String columnName;
                     if (settings.escapeColumnNames) {
-                        sqlBuilder.append(tableSchema.getQuotedIdentifierString())
-                                .append(desc.getColumnName())
-                                .append(tableSchema.getQuotedIdentifierString());
+                        columnName = tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString();
                     } else {
-                        sqlBuilder.append(desc.getColumnName());
+                        columnName = desc.getColumnName();
                     }
-                    sqlBuilder.append(" = ?");
+                    // Need to build a null-safe construct for the WHERE clause, since we are using PreparedStatement and won't know if the values are null. If they are null,
+                    // then the filter should be "column IS null" vs "column = null". Since we don't know whether the value is null, we can use the following construct (from NIFI-3742):
+                    //   (column = ? OR (column is null AND ? is null))
+                    sqlBuilder.append("(");
+                    sqlBuilder.append(columnName);
+                    sqlBuilder.append(" = ? OR (");
+                    sqlBuilder.append(columnName);
+                    sqlBuilder.append(" is null AND ? is null))");
                     includedColumns.add(i);
 
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d61f5193/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index e99f43a..355f192 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -144,7 +144,7 @@ class TestPutDatabaseRecord {
             assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
                     generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
 
-            assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?',
+            assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
                     generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
         }
     }
@@ -608,6 +608,51 @@ class TestPutDatabaseRecord {
         conn.close()
     }
 
+    @Test
+    void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)")
+        stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)")
+        stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)")
+        stmt.close()
+
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(2, 'rec2', null)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(101, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(3, rs.getInt(1))
+        assertEquals('rec3', rs.getString(2))
+        assertEquals(103, rs.getInt(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+
     private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
         final Connection conn = dbcp.getConnection()
         final Statement stmt = conn.createStatement()