You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:31 UTC
[nifi] 13/15: NIFI-9194: Upsert for Oracle12+
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2273fe57660cad05943e018abcbeb2fec8d4325c
Author: Roberto Santos <rs...@gmail.com>
AuthorDate: Sat Sep 4 08:40:16 2021 -0300
NIFI-9194: Upsert for Oracle12+
Fixes pr #5366.
Fixes pr #5366. Replace tabchars fot whitespaces.
Fixes pr #5366. Replaced tabchars for whitespaces. Removed unnecessary comments.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #5366
---
.../standard/db/impl/Oracle12DatabaseAdapter.java | 105 ++++++++++++++++++++-
.../db/impl/TestOracle12DatabaseAdapter.java | 79 ++++++++++++++++
2 files changed, 179 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
index 18f3ceb..63e7379 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.processors.standard.db.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
-/**
- * A database adapter that generates MS SQL Compatible SQL.
- */
public class Oracle12DatabaseAdapter implements DatabaseAdapter {
@Override
public String getName() {
@@ -34,12 +36,14 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
}
@Override
- public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
+ Long limit, Long offset) {
return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
}
@Override
- public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
+ public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
+ Long limit, Long offset, String columnForPartitioning) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}
@@ -93,4 +97,95 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
public String getTableAliasClause(String tableName) {
return tableName;
}
+
+ @Override
+ public boolean supportsUpsert() {
+ return true;
+ }
+
+ @Override
+ public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames)
+ throws IllegalArgumentException {
+ if (StringUtils.isEmpty(table)) {
+ throw new IllegalArgumentException("Table name cannot be null or blank");
+ }
+ if (columnNames == null || columnNames.isEmpty()) {
+ throw new IllegalArgumentException("Column names cannot be null or empty");
+ }
+ if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) {
+ throw new IllegalArgumentException("Key column names cannot be null or empty");
+ }
+
+ String newValuesAlias = "n";
+
+ String columns = columnNames.stream().collect(Collectors.joining(", ? "));
+
+ columns = "? " + columns;
+
+ List<String> columnsAssignment = getColumnsAssignment(columnNames, newValuesAlias, table);
+
+ List<String> conflictColumnsClause = getConflictColumnsClause(uniqueKeyColumnNames, columnsAssignment, table,
+ newValuesAlias);
+ String conflictClause = "(" + conflictColumnsClause.stream().collect(Collectors.joining(" AND ")) + ")";
+
+ String insertStatement = columnNames.stream().collect(Collectors.joining(", "));
+ String insertValues = newValuesAlias + "."
+ + columnNames.stream().collect(Collectors.joining(", " + newValuesAlias + "."));
+
+ columnsAssignment.removeAll(conflictColumnsClause);
+ String updateStatement = columnsAssignment.stream().collect(Collectors.joining(", "));
+
+ StringBuilder statementStringBuilder = new StringBuilder("MERGE INTO ").append(table).append(" USING (SELECT ")
+ .append(columns).append(" FROM DUAL) ").append(newValuesAlias).append(" ON ").append(conflictClause)
+ .append(" WHEN NOT MATCHED THEN INSERT (").append(insertStatement).append(") VALUES (")
+ .append(insertValues).append(")").append(" WHEN MATCHED THEN UPDATE SET ").append(updateStatement);
+
+ return statementStringBuilder.toString();
+ }
+
+ private List<String> getConflictColumnsClause(Collection<String> uniqueKeyColumnNames, List<String> conflictColumns,
+ String table, String newTableAlias) {
+ List<String> conflictColumnsClause = conflictColumns.stream()
+ .filter(column -> uniqueKeyColumnNames.stream().anyMatch(
+ uniqueKey -> column.equalsIgnoreCase(getColumnAssignment(table, uniqueKey, newTableAlias))))
+ .collect(Collectors.toList());
+
+ if (conflictColumnsClause.isEmpty()) {
+
+ // Try it with normalized columns
+ conflictColumnsClause = conflictColumns.stream()
+ .filter((column -> uniqueKeyColumnNames.stream()
+ .anyMatch(uniqueKey -> normalizeColumnName(column).equalsIgnoreCase(
+ normalizeColumnName(getColumnAssignment(table, uniqueKey, newTableAlias))))))
+ .collect(Collectors.toList());
+ }
+
+ return conflictColumnsClause;
+
+ }
+
+ private String normalizeColumnName(final String colName) {
+ return colName == null ? null : colName.toUpperCase().replace("_", "");
+ }
+
+ private List<String> getColumnsAssignment(Collection<String> columnsNames, String newTableAlias, String table) {
+ List<String> conflictClause = new ArrayList<>();
+
+ for (String columnName : columnsNames) {
+
+ StringBuilder statementStringBuilder = new StringBuilder();
+
+ statementStringBuilder.append(getColumnAssignment(table, columnName, newTableAlias));
+
+ conflictClause.add(statementStringBuilder.toString());
+
+ }
+
+ return conflictClause;
+ }
+
+ private String getColumnAssignment(String table, String columnName, String newTableAlias) {
+ return table + "." + columnName + " = " + newTableAlias + "." + columnName;
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 2315e98..99d625a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -16,6 +16,15 @@
*/
package org.apache.nifi.processors.standard.db.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.junit.Assert;
import org.junit.Test;
@@ -86,4 +95,74 @@ public class TestOracle12DatabaseAdapter {
String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
Assert.assertEquals(expected4, sql4);
}
+
+ @Test
+ public void testSupportsUpsert() throws Exception {
+ assertTrue(db.getClass().getSimpleName() + " should support upsert", db.supportsUpsert());
+ }
+
+ @Test
+ public void testGetUpsertStatementWithNullTableName() throws Exception {
+ testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
+ }
+
+ @Test
+ public void testGetUpsertStatementWithBlankTableName() throws Exception {
+ testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
+ }
+
+ @Test
+ public void testGetUpsertStatementWithNullColumnNames() throws Exception {
+ testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
+ }
+
+ @Test
+ public void testGetUpsertStatementWithEmptyColumnNames() throws Exception {
+ testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
+ }
+
+ @Test
+ public void testGetUpsertStatementWithNullKeyColumnNames() throws Exception {
+ testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty"));
+ }
+
+ @Test
+ public void testGetUpsertStatementWithEmptyKeyColumnNames() throws Exception {
+ testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty"));
+ }
+
+ @Test
+ public void testGetUpsertStatement() throws Exception {
+ // GIVEN
+ String tableName = "table";
+ List<String> columnNames = Arrays.asList("column1","column2", "column3", "column4");
+ Collection<String> uniqueKeyColumnNames = Arrays.asList("column2","column4");
+
+ String expected = "MERGE INTO table USING (SELECT ? column1, ? column2, ? column3, ? column4 FROM DUAL) n" +
+ " ON (table.column2 = n.column2 AND table.column4 = n.column4) WHEN NOT MATCHED THEN" +
+ " INSERT (column1, column2, column3, column4) VALUES (n.column1, n.column2, n.column3, n.column4)" +
+ " WHEN MATCHED THEN UPDATE SET table.column1 = n.column1, table.column3 = n.column3";
+
+ // WHEN
+ // THEN
+ testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
+ }
+
+ private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
+ try {
+ testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String)null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals(expected.getMessage(), e.getMessage());
+ }
+ }
+
+ private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
+ // WHEN
+ String actual = db.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames);
+
+ // THEN
+ assertEquals(expected, actual);
+ }
+
}
\ No newline at end of file