You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/10/17 15:32:09 UTC

[nifi] branch main updated: NIFI-10635: Fix handling of enums in PutDatabaseRecord

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a76abef270 NIFI-10635: Fix handling of enums in PutDatabaseRecord
a76abef270 is described below

commit a76abef270f7ac875a509060f81a491ca49b0a9d
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Oct 12 14:17:50 2022 -0400

    NIFI-10635: Fix handling of enums in PutDatabaseRecord
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #6518.
---
 .../serialization/record/util/DataTypeUtils.java   |  2 +
 .../processors/standard/PutDatabaseRecord.java     | 14 ++++++-
 .../processors/standard/DBCPServiceSimpleImpl.java | 23 +++++++++--
 .../processors/standard/PutDatabaseRecordTest.java | 48 +++++++++++++++++++++-
 4 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 5000c78fe1..d6db0b78fa 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -2131,6 +2131,8 @@ public class DataTypeUtils {
                 return Types.SMALLINT;
             case STRING:
                 return Types.VARCHAR;
+            case ENUM:
+                return Types.OTHER;
             case TIME:
                 return Types.TIME;
             case TIMESTAMP:
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 3df3c89388..bcb26ceca5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -863,7 +863,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
             }
         } else {
             try {
-                ps.setObject(index, value, sqlType);
+                // If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs,
+                // try the normal way of using the sqlType
+                // This helps with PostgreSQL enums and possibly other scenarios
+                if (fieldSqlType == Types.OTHER && sqlType == Types.VARCHAR) {
+                    try {
+                        ps.setObject(index, value, fieldSqlType);
+                    } catch (SQLException e) {
+                        // Fall back to default setObject params
+                        ps.setObject(index, value, sqlType);
+                    }
+                } else {
+                    ps.setObject(index, value, sqlType);
+                }
             } catch (SQLException e) {
                 throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e);
             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
index aa1e761097..d1337a7726 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
@@ -20,18 +20,27 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 
 /**
- * Simple implementation only for GenerateTableFetch processor testing.
+ * Simple implementation only for DB processor testing.
  */
 public class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
 
     private String databaseLocation;
+    private boolean isDerby;
 
+    // Default to use Derby connection
     public DBCPServiceSimpleImpl(final String databaseLocation) {
+        this(databaseLocation, true);
+    }
+
+    public DBCPServiceSimpleImpl(final String databaseLocation, final boolean isDerby) {
         this.databaseLocation = databaseLocation;
+        this.isDerby = isDerby;
     }
 
     @Override
@@ -42,8 +51,16 @@ public class DBCPServiceSimpleImpl extends AbstractControllerService implements
     @Override
     public Connection getConnection() throws ProcessException {
         try {
-            Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-            return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true");
+            if (isDerby) {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true");
+            } else {
+                // Use H2
+                Path currentPath = Paths.get("");
+                String absolutePathPrefix = currentPath.toFile().getAbsolutePath();
+                String connectionString = "jdbc:h2:file:" + absolutePathPrefix + "/" + databaseLocation + ";DB_CLOSE_ON_EXIT=TRUE";
+                return DriverManager.getConnection(connectionString, "SA", "");
+            }
         } catch (final Exception e) {
             throw new ProcessException("getConnection failed: " + e);
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index dfef1eacdb..bc88542531 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -51,7 +51,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
-import java.sql.SQLNonTransientConnectionException;
 import java.sql.Statement;
 import java.time.LocalDate;
 import java.time.ZoneOffset;
@@ -115,7 +114,7 @@ public class PutDatabaseRecordTest {
     public static void shutdownDatabase() throws Exception {
         try {
             DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
-        } catch (SQLNonTransientConnectionException ignore) {
+        } catch (Exception ignore) {
             // Do nothing, this is what happens at Derby shutdown
         }
         // remove previous test database, if any
@@ -1759,6 +1758,51 @@ public class PutDatabaseRecordTest {
         runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
     }
 
+    @Test
+    void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException {
+        dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("dbcp", dbcp, new HashMap<>());
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+        try (Connection conn = dbcp.getConnection()) {
+            conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST");
+        }
+        recreateTable("CREATE TABLE IF NOT EXISTS ENUM_TEST (id integer primary key, suit ENUM('clubs', 'diamonds', 'hearts', 'spades'))");
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        parser.addSchemaField("id", RecordFieldType.INT);
+        parser.addSchemaField("suit", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("clubs", "diamonds", "hearts", "spades")).getFieldType());
+
+        parser.addRecord(1, "diamonds");
+        parser.addRecord(2, "hearts");
+
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "ENUM_TEST");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+        final Connection conn = dbcp.getConnection();
+        final Statement stmt = conn.createStatement();
+        final ResultSet rs = stmt.executeQuery("SELECT * FROM ENUM_TEST");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("diamonds", rs.getString(2));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("hearts", rs.getString(2));
+        assertFalse(rs.next());
+
+        stmt.close();
+        conn.close();
+    }
+
     private void recreateTable() throws ProcessException {
         try (final Connection conn = dbcp.getConnection();
             final Statement stmt = conn.createStatement()) {