You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:31 UTC

[nifi] 13/15: NIFI-9194: Upsert for Oracle12+

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2273fe57660cad05943e018abcbeb2fec8d4325c
Author: Roberto Santos <rs...@gmail.com>
AuthorDate: Sat Sep 4 08:40:16 2021 -0300

    NIFI-9194: Upsert for Oracle12+
    
    Fixes pr #5366.
    
    Fixes pr #5366. Replace tabchars fot whitespaces.
    
    Fixes pr #5366. Replaced tabchars for whitespaces. Removed unnecessary comments.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #5366
---
 .../standard/db/impl/Oracle12DatabaseAdapter.java  | 105 ++++++++++++++++++++-
 .../db/impl/TestOracle12DatabaseAdapter.java       |  79 ++++++++++++++++
 2 files changed, 179 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
index 18f3ceb..63e7379 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/Oracle12DatabaseAdapter.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.processors.standard.db.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 
-/**
- * A database adapter that generates MS SQL Compatible SQL.
- */
 public class Oracle12DatabaseAdapter implements DatabaseAdapter {
     @Override
     public String getName() {
@@ -34,12 +36,14 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset) {
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
+            Long limit, Long offset) {
         return getSelectStatement(tableName, columnNames, whereClause, orderByClause, limit, offset, null);
     }
 
     @Override
-    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Long limit, Long offset, String columnForPartitioning) {
+    public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause,
+            Long limit, Long offset, String columnForPartitioning) {
         if (StringUtils.isEmpty(tableName)) {
             throw new IllegalArgumentException("Table name cannot be null or empty");
         }
@@ -93,4 +97,95 @@ public class Oracle12DatabaseAdapter implements DatabaseAdapter {
     public String getTableAliasClause(String tableName) {
         return tableName;
     }
+
+    @Override
+    public boolean supportsUpsert() {
+        return true;
+    }
+
+    @Override
+    public String getUpsertStatement(String table, List<String> columnNames, Collection<String> uniqueKeyColumnNames)
+            throws IllegalArgumentException {
+        if (StringUtils.isEmpty(table)) {
+            throw new IllegalArgumentException("Table name cannot be null or blank");
+        }
+        if (columnNames == null || columnNames.isEmpty()) {
+            throw new IllegalArgumentException("Column names cannot be null or empty");
+        }
+        if (uniqueKeyColumnNames == null || uniqueKeyColumnNames.isEmpty()) {
+            throw new IllegalArgumentException("Key column names cannot be null or empty");
+        }
+
+        String newValuesAlias = "n";
+
+        String columns = columnNames.stream().collect(Collectors.joining(", ? "));
+
+        columns = "? " + columns;
+
+        List<String> columnsAssignment = getColumnsAssignment(columnNames, newValuesAlias, table);
+
+        List<String> conflictColumnsClause = getConflictColumnsClause(uniqueKeyColumnNames, columnsAssignment, table,
+                newValuesAlias);
+        String conflictClause = "(" + conflictColumnsClause.stream().collect(Collectors.joining(" AND ")) + ")";
+
+        String insertStatement = columnNames.stream().collect(Collectors.joining(", "));
+        String insertValues = newValuesAlias + "."
+                + columnNames.stream().collect(Collectors.joining(", " + newValuesAlias + "."));
+
+        columnsAssignment.removeAll(conflictColumnsClause);
+        String updateStatement = columnsAssignment.stream().collect(Collectors.joining(", "));
+
+        StringBuilder statementStringBuilder = new StringBuilder("MERGE INTO ").append(table).append(" USING (SELECT ")
+                .append(columns).append(" FROM DUAL) ").append(newValuesAlias).append(" ON ").append(conflictClause)
+                .append(" WHEN NOT MATCHED THEN INSERT (").append(insertStatement).append(") VALUES (")
+                .append(insertValues).append(")").append(" WHEN MATCHED THEN UPDATE SET ").append(updateStatement);
+
+        return statementStringBuilder.toString();
+    }
+
+    private List<String> getConflictColumnsClause(Collection<String> uniqueKeyColumnNames, List<String> conflictColumns,
+            String table, String newTableAlias) {
+        List<String> conflictColumnsClause = conflictColumns.stream()
+                .filter(column -> uniqueKeyColumnNames.stream().anyMatch(
+                        uniqueKey -> column.equalsIgnoreCase(getColumnAssignment(table, uniqueKey, newTableAlias))))
+                .collect(Collectors.toList());
+
+        if (conflictColumnsClause.isEmpty()) {
+
+            // Try it with normalized columns
+            conflictColumnsClause = conflictColumns.stream()
+                    .filter((column -> uniqueKeyColumnNames.stream()
+                            .anyMatch(uniqueKey -> normalizeColumnName(column).equalsIgnoreCase(
+                                    normalizeColumnName(getColumnAssignment(table, uniqueKey, newTableAlias))))))
+                    .collect(Collectors.toList());
+        }
+
+        return conflictColumnsClause;
+
+    }
+
+    private String normalizeColumnName(final String colName) {
+        return colName == null ? null : colName.toUpperCase().replace("_", "");
+    }
+
+    private List<String> getColumnsAssignment(Collection<String> columnsNames, String newTableAlias, String table) {
+        List<String> conflictClause = new ArrayList<>();
+
+        for (String columnName : columnsNames) {
+
+            StringBuilder statementStringBuilder = new StringBuilder();
+
+            statementStringBuilder.append(getColumnAssignment(table, columnName, newTableAlias));
+
+            conflictClause.add(statementStringBuilder.toString());
+
+        }
+
+        return conflictClause;
+    }
+
+    private String getColumnAssignment(String table, String columnName, String newTableAlias) {
+        return table + "." + columnName + " = " + newTableAlias + "." + columnName;
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 2315e98..99d625a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -16,6 +16,15 @@
  */
 package org.apache.nifi.processors.standard.db.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.junit.Assert;
 import org.junit.Test;
@@ -86,4 +95,74 @@ public class TestOracle12DatabaseAdapter {
         String expected4 = "SELECT some(set),of(columns),that,might,contain,methods,a.* FROM database.tablename";
         Assert.assertEquals(expected4, sql4);
     }
+
+    @Test
+    public void testSupportsUpsert() throws Exception {
+        assertTrue(db.getClass().getSimpleName() + " should support upsert", db.supportsUpsert());
+    }
+
+    @Test
+    public void testGetUpsertStatementWithNullTableName() throws Exception {
+        testGetUpsertStatement(null, Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
+    }
+
+    @Test
+    public void testGetUpsertStatementWithBlankTableName() throws Exception {
+        testGetUpsertStatement("", Arrays.asList("notEmpty"), Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be null or blank"));
+    }
+
+    @Test
+    public void testGetUpsertStatementWithNullColumnNames() throws Exception {
+        testGetUpsertStatement("notEmpty", null, Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
+    }
+
+    @Test
+    public void testGetUpsertStatementWithEmptyColumnNames() throws Exception {
+        testGetUpsertStatement("notEmpty", Collections.emptyList(), Arrays.asList("notEmpty"), new IllegalArgumentException("Column names cannot be null or empty"));
+    }
+
+    @Test
+    public void testGetUpsertStatementWithNullKeyColumnNames() throws Exception {
+        testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), null, new IllegalArgumentException("Key column names cannot be null or empty"));
+    }
+
+    @Test
+    public void testGetUpsertStatementWithEmptyKeyColumnNames() throws Exception {
+        testGetUpsertStatement("notEmpty", Arrays.asList("notEmpty"), Collections.emptyList(), new IllegalArgumentException("Key column names cannot be null or empty"));
+    }
+
+    @Test
+    public void testGetUpsertStatement() throws Exception {
+        // GIVEN
+        String tableName = "table";
+        List<String> columnNames = Arrays.asList("column1","column2", "column3", "column4");
+        Collection<String> uniqueKeyColumnNames = Arrays.asList("column2","column4");
+
+        String expected = "MERGE INTO table USING (SELECT ? column1, ? column2, ? column3, ? column4 FROM DUAL) n" +
+        " ON (table.column2 = n.column2 AND table.column4 = n.column4) WHEN NOT MATCHED THEN" +
+        " INSERT (column1, column2, column3, column4) VALUES (n.column1, n.column2, n.column3, n.column4)" +
+        " WHEN MATCHED THEN UPDATE SET table.column1 = n.column1, table.column3 = n.column3";
+
+        // WHEN
+        // THEN
+        testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, expected);
+    }
+
+    private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, IllegalArgumentException expected) {
+        try {
+            testGetUpsertStatement(tableName, columnNames, uniqueKeyColumnNames, (String)null);
+            fail();
+        } catch (IllegalArgumentException e) {
+            assertEquals(expected.getMessage(), e.getMessage());
+        }
+    }
+
+    private void testGetUpsertStatement(String tableName, List<String> columnNames, Collection<String> uniqueKeyColumnNames, String expected) {
+        // WHEN
+        String actual = db.getUpsertStatement(tableName, columnNames, uniqueKeyColumnNames);
+
+        // THEN
+        assertEquals(expected, actual);
+    }
+
 }
\ No newline at end of file