You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/04/28 02:35:44 UTC
nifi git commit: NIFI-3742: Fixed handling of nulls in DELETE for
PutDatabaseRecord
Repository: nifi
Updated Branches:
refs/heads/master 8651d7977 -> d61f51932
NIFI-3742: Fixed handling of nulls in DELETE for PutDatabaseRecord
This closes #1709.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d61f5193
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d61f5193
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d61f5193
Branch: refs/heads/master
Commit: d61f5193261844661aedec68152db6fa78787a31
Parents: 8651d79
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Apr 27 11:36:47 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Apr 28 11:34:43 2017 +0900
----------------------------------------------------------------------
.../processors/standard/PutDatabaseRecord.java | 32 ++++++++++---
.../standard/TestPutDatabaseRecord.groovy | 47 +++++++++++++++++++-
2 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d61f5193/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 0f65014..a591122 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -646,12 +646,24 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
if (values != null) {
if (fieldIndexes != null) {
for (int i = 0; i < fieldIndexes.size(); i++) {
- ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+ // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+ if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+ ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]);
+ ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]);
+ } else {
+ ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+ }
}
} else {
// If there's no index map, assume all values are included and set them in order
for (int i = 0; i < values.length; i++) {
- ps.setObject(i + 1, values[i]);
+ // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
+ if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+ ps.setObject(i * 2 + 1, values[i]);
+ ps.setObject(i * 2 + 2, values[i]);
+ } else {
+ ps.setObject(i + 1, values[i]);
+ }
}
}
ps.addBatch();
@@ -935,14 +947,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
sqlBuilder.append(" AND ");
}
+ String columnName;
if (settings.escapeColumnNames) {
- sqlBuilder.append(tableSchema.getQuotedIdentifierString())
- .append(desc.getColumnName())
- .append(tableSchema.getQuotedIdentifierString());
+ columnName = tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString();
} else {
- sqlBuilder.append(desc.getColumnName());
+ columnName = desc.getColumnName();
}
- sqlBuilder.append(" = ?");
+ // Need to build a null-safe construct for the WHERE clause, since we are using PreparedStatement and won't know if the values are null. If they are null,
+ // then the filter should be "column IS null" vs "column = null". Since we don't know whether the value is null, we can use the following construct (from NIFI-3742):
+ // (column = ? OR (column is null AND ? is null))
+ sqlBuilder.append("(");
+ sqlBuilder.append(columnName);
+ sqlBuilder.append(" = ? OR (");
+ sqlBuilder.append(columnName);
+ sqlBuilder.append(" is null AND ? is null))");
includedColumns.add(i);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d61f5193/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index e99f43a..355f192 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -144,7 +144,7 @@ class TestPutDatabaseRecord {
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
generateInsert(schema, 'PERSONS', tableSchema, settings).sql)
- assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?',
+ assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
@@ -608,6 +608,51 @@ class TestPutDatabaseRecord {
conn.close()
}
+ @Test
+ void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
+ recreateTable("PERSONS", createPersons)
+ Connection conn = dbcp.getConnection()
+ Statement stmt = conn.createStatement()
+ stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)")
+ stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)")
+ stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)")
+ stmt.close()
+
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(2, 'rec2', null)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(101, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals(3, rs.getInt(1))
+ assertEquals('rec3', rs.getString(2))
+ assertEquals(103, rs.getInt(3))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
+
private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()