You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/10/17 15:32:09 UTC
[nifi] branch main updated: NIFI-10635: Fix handling of enums in PutDatabaseRecord
This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a76abef270 NIFI-10635: Fix handling of enums in PutDatabaseRecord
a76abef270 is described below
commit a76abef270f7ac875a509060f81a491ca49b0a9d
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Oct 12 14:17:50 2022 -0400
NIFI-10635: Fix handling of enums in PutDatabaseRecord
Signed-off-by: Nathan Gough <th...@gmail.com>
This closes #6518.
---
.../serialization/record/util/DataTypeUtils.java | 2 +
.../processors/standard/PutDatabaseRecord.java | 14 ++++++-
.../processors/standard/DBCPServiceSimpleImpl.java | 23 +++++++++--
.../processors/standard/PutDatabaseRecordTest.java | 48 +++++++++++++++++++++-
4 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 5000c78fe1..d6db0b78fa 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -2131,6 +2131,8 @@ public class DataTypeUtils {
return Types.SMALLINT;
case STRING:
return Types.VARCHAR;
+ case ENUM:
+ return Types.OTHER;
case TIME:
return Types.TIME;
case TIMESTAMP:
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 3df3c89388..bcb26ceca5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -863,7 +863,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
} else {
try {
- ps.setObject(index, value, sqlType);
+ // If the specified field type is OTHER and the SQL type is VARCHAR, the conversion went ok as a string literal but try the OTHER type when setting the parameter. If an error occurs,
+ // try the normal way of using the sqlType
+ // This helps with PostgreSQL enums and possibly other scenarios
+ if (fieldSqlType == Types.OTHER && sqlType == Types.VARCHAR) {
+ try {
+ ps.setObject(index, value, fieldSqlType);
+ } catch (SQLException e) {
+ // Fall back to default setObject params
+ ps.setObject(index, value, sqlType);
+ }
+ } else {
+ ps.setObject(index, value, sqlType);
+ }
} catch (SQLException e) {
throw new IOException("Unable to setObject() with value " + value + " at index " + index + " of type " + sqlType , e);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
index aa1e761097..d1337a7726 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/DBCPServiceSimpleImpl.java
@@ -20,18 +20,27 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
/**
- * Simple implementation only for GenerateTableFetch processor testing.
+ * Simple implementation only for DB processor testing.
*/
public class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
private String databaseLocation;
+ private boolean isDerby;
+ // Default to use Derby connection
public DBCPServiceSimpleImpl(final String databaseLocation) {
+ this(databaseLocation, true);
+ }
+
+ public DBCPServiceSimpleImpl(final String databaseLocation, final boolean isDerby) {
this.databaseLocation = databaseLocation;
+ this.isDerby = isDerby;
}
@Override
@@ -42,8 +51,16 @@ public class DBCPServiceSimpleImpl extends AbstractControllerService implements
@Override
public Connection getConnection() throws ProcessException {
try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true");
+ if (isDerby) {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+ return DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true");
+ } else {
+ // Use H2
+ Path currentPath = Paths.get("");
+ String absolutePathPrefix = currentPath.toFile().getAbsolutePath();
+ String connectionString = "jdbc:h2:file:" + absolutePathPrefix + "/" + databaseLocation + ";DB_CLOSE_ON_EXIT=TRUE";
+ return DriverManager.getConnection(connectionString, "SA", "");
+ }
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index dfef1eacdb..bc88542531 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -51,7 +51,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
-import java.sql.SQLNonTransientConnectionException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
@@ -115,7 +114,7 @@ public class PutDatabaseRecordTest {
public static void shutdownDatabase() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
- } catch (SQLNonTransientConnectionException ignore) {
+ } catch (Exception ignore) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
@@ -1759,6 +1758,51 @@ public class PutDatabaseRecordTest {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
}
+ @Test
+ void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException {
+ dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("dbcp", dbcp, new HashMap<>());
+ runner.enableControllerService(dbcp);
+ runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
+ try (Connection conn = dbcp.getConnection()) {
+ conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST");
+ }
+ recreateTable("CREATE TABLE IF NOT EXISTS ENUM_TEST (id integer primary key, suit ENUM('clubs', 'diamonds', 'hearts', 'spades'))");
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService("parser", parser);
+ runner.enableControllerService(parser);
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("suit", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("clubs", "diamonds", "hearts", "spades")).getFieldType());
+
+ parser.addRecord(1, "diamonds");
+ parser.addRecord(2, "hearts");
+
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, "ENUM_TEST");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+ final Connection conn = dbcp.getConnection();
+ final Statement stmt = conn.createStatement();
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM ENUM_TEST");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("diamonds", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("hearts", rs.getString(2));
+ assertFalse(rs.next());
+
+ stmt.close();
+ conn.close();
+ }
+
private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {