You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/14 00:34:29 UTC

[nifi] branch master updated (0e10e41 -> 4db5446)

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 0e10e41  NIFI-4358 This closes #3363. cassandra connection enable compression at resquest and response
     new ca76fe1  NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils
     new 4db5446  NIFI-6082: Refactor the way to handle fields nullable

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi-database-test-utils}/pom.xml              |  16 +-
 .../apache/nifi/util/db/SimpleCommerceDataSet.java | 112 ++++++++++
 .../pom.xml                                        |  44 +++-
 .../java/org/apache/nifi/util/db}/AvroUtil.java    |   2 +-
 .../java/org/apache/nifi/util/db}/JdbcCommon.java  |  71 +------
 .../apache/nifi/util/db}/JdbcCommonTestUtils.java  |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcCommon.java   |  18 +-
 .../nifi/util/db}/TestJdbcCommonConvertToAvro.java |   8 +-
 .../apache/nifi/util/db}/TestJdbcHugeStream.java   |   2 +-
 .../apache/nifi/util/db}/TestJdbcTypesDerby.java   |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcTypesH2.java  |   2 +-
 nifi-nar-bundles/nifi-extension-utils/pom.xml      |   2 +
 .../nifi-standard-processors/pom.xml               |  11 +
 .../processors/standard/AbstractExecuteSQL.java    |   2 +-
 .../standard/AbstractQueryDatabaseTable.java       |   2 +-
 .../nifi/processors/standard/ExecuteSQL.java       |  12 +-
 .../nifi/processors/standard/ExecuteSQLRecord.java |   5 +-
 .../nifi/processors/standard/LookupRecord.java     |  16 +-
 .../apache/nifi/processors/standard/PutSQL.java    |   2 +-
 .../processors/standard/QueryDatabaseTable.java    |  10 +-
 .../standard/QueryDatabaseTableRecord.java         |   4 +-
 .../standard/sql/DefaultAvroSqlWriter.java         |  12 +-
 .../processors/standard/sql/RecordSqlWriter.java   |  14 +-
 .../nifi/processors/standard/sql/SqlWriter.java    |   6 +-
 .../processors/standard/util/JdbcProperties.java   |  81 ++++++++
 .../nifi/processors/standard/TestExecuteSQL.java   |   6 +-
 .../processors/standard/TestExecuteSQLRecord.java  |   4 +-
 .../nifi/processors/standard/TestLookupRecord.java |  40 ++++
 .../nifi-lookup-services/pom.xml                   |  29 ++-
 .../lookup/db/AbstractDatabaseLookupService.java   | 104 ++++++++++
 .../lookup/db/DatabaseRecordLookupService.java     | 190 +++++++++++++++++
 .../lookup/db/SimpleDatabaseLookupService.java     | 174 ++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   2 +
 .../db/TestDatabaseRecordLookupService.groovy      | 229 +++++++++++++++++++++
 .../db/TestSimpleDatabaseLookupService.groovy      | 184 +++++++++++++++++
 35 files changed, 1274 insertions(+), 146 deletions(-)
 copy nifi-nar-bundles/{nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-content-access => nifi-extension-utils/nifi-database-test-utils}/pom.xml (71%)
 create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
 copy nifi-nar-bundles/nifi-extension-utils/{nifi-record-utils/nifi-avro-record-utils => nifi-database-utils}/pom.xml (73%)
 mode change 100755 => 100644
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db}/AvroUtil.java (97%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db}/JdbcCommon.java (90%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/JdbcCommonTestUtils.java (97%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/TestJdbcCommon.java (97%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/TestJdbcCommonConvertToAvro.java (92%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/TestJdbcHugeStream.java (99%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/TestJdbcTypesDerby.java (99%)
 rename nifi-nar-bundles/{nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util => nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db}/TestJdbcTypesH2.java (99%)
 create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
 create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy


[nifi] 01/02: NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ca76fe178cfae8890b347081260cd59a62321219
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 27 16:17:46 2019 -0500

    NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils
    
    NIFI-6082: Added SimpleDatabaseLookupService
    
    NIFI-6082: Merged Koji's improvements, incorporated review comments
    
    This closes #3341.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../serialization/record/ResultSetRecordSet.java   |  18 +-
 .../{ => nifi-database-test-utils}/pom.xml         |  26 +--
 .../apache/nifi/util/db/SimpleCommerceDataSet.java | 112 ++++++++++
 .../nifi-database-utils/pom.xml                    | 101 +++++++++
 .../java/org/apache/nifi/util/db}/AvroUtil.java    |   2 +-
 .../java/org/apache/nifi/util/db}/JdbcCommon.java  |  71 +------
 .../apache/nifi/util/db}/JdbcCommonTestUtils.java  |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcCommon.java   |  18 +-
 .../nifi/util/db}/TestJdbcCommonConvertToAvro.java |   8 +-
 .../apache/nifi/util/db}/TestJdbcHugeStream.java   |   2 +-
 .../apache/nifi/util/db}/TestJdbcTypesDerby.java   |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcTypesH2.java  |   2 +-
 nifi-nar-bundles/nifi-extension-utils/pom.xml      |   2 +
 .../nifi-standard-processors/pom.xml               |  11 +
 .../processors/standard/AbstractExecuteSQL.java    |   2 +-
 .../standard/AbstractQueryDatabaseTable.java       |   2 +-
 .../nifi/processors/standard/ExecuteSQL.java       |  12 +-
 .../nifi/processors/standard/ExecuteSQLRecord.java |   5 +-
 .../nifi/processors/standard/LookupRecord.java     |   5 +-
 .../apache/nifi/processors/standard/PutSQL.java    |   2 +-
 .../processors/standard/QueryDatabaseTable.java    |  10 +-
 .../standard/QueryDatabaseTableRecord.java         |   4 +-
 .../standard/sql/DefaultAvroSqlWriter.java         |  12 +-
 .../processors/standard/sql/RecordSqlWriter.java   |  14 +-
 .../nifi/processors/standard/sql/SqlWriter.java    |   6 +-
 .../processors/standard/util/JdbcProperties.java   |  81 ++++++++
 .../nifi/processors/standard/TestExecuteSQL.java   |   6 +-
 .../processors/standard/TestExecuteSQLRecord.java  |   4 +-
 .../nifi-lookup-services/pom.xml                   |  29 ++-
 .../lookup/db/AbstractDatabaseLookupService.java   | 104 ++++++++++
 .../lookup/db/DatabaseRecordLookupService.java     | 206 ++++++++++++++++++
 .../lookup/db/SimpleDatabaseLookupService.java     | 174 ++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   2 +
 .../db/TestDatabaseRecordLookupService.groovy      | 229 +++++++++++++++++++++
 .../db/TestSimpleDatabaseLookupService.groovy      | 184 +++++++++++++++++
 35 files changed, 1322 insertions(+), 148 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index ee47c63..fc3d60f 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,9 +55,21 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+        this(rs, readerSchema, false);
+    }
+
+    /**
+     * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
+     *
+     * @param rs The underlying ResultSet for this RecordSet
+     * @param readerSchema The schema to which this RecordSet adheres
+     * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
+     * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
+     */
+    public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
-        this.schema = createSchema(rs, readerSchema);
+        this.schema = createSchema(rs, readerSchema, allFieldsNullable);
 
         rsColumnNames = new HashSet<>();
         final ResultSetMetaData metadata = rs.getMetaData();
@@ -140,7 +152,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return value;
     }
 
-    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -154,7 +166,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
             final int nullableFlag = metadata.isNullable(column);
             final boolean nullable;
-            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
+            if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
                 nullable = false;
             } else {
                 nullable = true;
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
similarity index 55%
copy from nifi-nar-bundles/nifi-extension-utils/pom.xml
copy to nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
index b2c8f51..cf63b5e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -13,25 +13,19 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-bundles</artifactId>
+        <artifactId>nifi-extension-utils</artifactId>
         <version>1.10.0-SNAPSHOT</version>
     </parent>
-    <packaging>pom</packaging>
-    <artifactId>nifi-extension-utils</artifactId>
-    <description>
-        This module contains reusable utilities related to extensions that can be shared across NARs.
-    </description>
 
-    <modules>
-        <module>nifi-record-utils</module>
-        <module>nifi-hadoop-utils</module>
-        <module>nifi-processor-utils</module>
-        <module>nifi-reporting-utils</module>
-        <module>nifi-syslog-utils</module>
-    </modules>
+    <artifactId>nifi-database-test-utils</artifactId>
 
-</project>
+    <dependencies>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
new file mode 100644
index 0000000..89c2600
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+
+/**
+ * A sample data set for test consists of 'persons', 'products' and 'relationships' tables.
+ */
+public class SimpleCommerceDataSet {
+
+    static String dropPersons = "drop table persons";
+    static String dropProducts = "drop table products";
+    static String dropRelationships = "drop table relationships";
+    static String createPersons = "create table persons (id integer, name varchar(100), code integer)";
+    static String createProducts = "create table products (id integer, name varchar(100), code integer)";
+    static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)";
+
+    public static void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws SQLException {
+
+        System.out.println(createRandomName());
+        System.out.println(createRandomName());
+        System.out.println(createRandomName());
+
+        final Statement st = con.createStatement();
+
+        // tables may not exist, this is not serious problem.
+        try {
+            st.executeUpdate(dropPersons);
+        } catch (final Exception ignored) {
+        }
+
+        try {
+            st.executeUpdate(dropProducts);
+        } catch (final Exception ignored) {
+        }
+
+        try {
+            st.executeUpdate(dropRelationships);
+        } catch (final Exception ignored) {
+        }
+
+        st.executeUpdate(createPersons);
+        st.executeUpdate(createProducts);
+        st.executeUpdate(createRelationships);
+
+        for (int i = 0; i < nrOfPersons; i++)
+            loadPersons(st, i);
+
+        for (int i = 0; i < nrOfProducts; i++)
+            loadProducts(st, i);
+
+        for (int i = 0; i < nrOfRels; i++)
+            loadRelationships(st, i);
+
+        st.close();
+    }
+
+    static Random rng = new Random(53495);
+
+    static private void loadPersons(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private void loadProducts(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private void loadRelationships(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private String createRandomName() {
+        return createRandomString() + " " + createRandomString();
+    }
+
+    static private String createRandomString() {
+
+        final int length = rng.nextInt(10);
+        final String characters = "ABCDEFGHIJ";
+
+        final char[] text = new char[length];
+        for (int i = 0; i < length; i++) {
+            text[i] = characters.charAt(rng.nextInt(characters.length()));
+        }
+        return new String(text);
+    }
+
+    private Connection createConnection(String location) throws ClassNotFoundException, SQLException {
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        return DriverManager.getConnection("jdbc:derby:" + location + ";create=true");
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
new file mode 100644
index 0000000..3cc62e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-database-utils</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        <!-- Other modules using nifi-database-utils are expected to have these APIs available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.187</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/org/apache/nifi/avro/data.avro</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/schema.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/simpleSchema.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue1.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue2.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords1.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
index 970c7c2..8bb2261 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.apache.avro.file.CodecFactory;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
similarity index 90%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index 3de86c7..e41b3cb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
@@ -100,12 +100,9 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -114,11 +111,11 @@ import javax.xml.bind.DatatypeConverter;
  */
 public class JdbcCommon {
 
-    private static final int MAX_DIGITS_IN_BIGINT = 19;
-    private static final int MAX_DIGITS_IN_INT = 9;
+    public static final int MAX_DIGITS_IN_BIGINT = 19;
+    public static final int MAX_DIGITS_IN_INT = 9;
     // Derived from MySQL default precision.
-    private static final int DEFAULT_PRECISION_VALUE = 10;
-    private static final int DEFAULT_SCALE_VALUE = 0;
+    public static final int DEFAULT_PRECISION_VALUE = 10;
+    public static final int DEFAULT_SCALE_VALUE = 0;
 
     public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$");
     public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
@@ -126,62 +123,6 @@ public class JdbcCommon {
 
     public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
 
-    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
-            .name("dbf-normalize")
-            .displayName("Normalize Table/Column Names")
-            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
-                    + "will be changed to underscores in order to build a valid Avro record.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
-            .name("dbf-user-logical-types")
-            .displayName("Use Avro Logical Types")
-            .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
-                    + "If disabled, written as string. "
-                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
-                    + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
-                    + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
-                    + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
-                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
-                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
-            .name("dbf-default-precision")
-            .displayName("Default Decimal Precision")
-            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
-                    + " a specific 'precision' denoting number of available digits is required."
-                    + " Generally, precision is defined by column data type definition or database engines default."
-                    + " However undefined precision (0) can be returned from some database engines."
-                    + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
-            .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
-            .name("dbf-default-scale")
-            .displayName("Default Decimal Scale")
-            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
-                    + " a specific 'scale' denoting number of available decimal digits is required."
-                    + " Generally, scale is defined by column data type definition or database engines default."
-                    + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
-                    + " 'Default Decimal Scale' is used when writing those undefined numbers."
-                    + " If a value has more decimals than specified scale, then the value will be rounded-up,"
-                    + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
-            .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .required(true)
-            .build();
-
     public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
         return convertToAvroStream(rs, outStream, null, null, convertNames);
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
index ad57158..cc8d29e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
index 9cf4fc1..fa584c0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -427,7 +425,7 @@ public class TestJdbcCommon {
         when(metadata.getPrecision(1)).thenReturn(dbPrecision);
         when(metadata.getScale(1)).thenReturn(expectedScale);
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
 
@@ -580,12 +578,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn("t_int");
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final short s = 25;
         when(rs.getObject(Mockito.anyInt())).thenReturn(s);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -608,12 +606,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn(mockColumnName);
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final Long ret = 0L;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -636,12 +634,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn(mockColumnName);
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final Long ret = 0L;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
similarity index 92%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
index eb736e2..e6f9743 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
@@ -40,8 +40,6 @@ import static java.sql.Types.INTEGER;
 import static java.sql.Types.SMALLINT;
 import static java.sql.Types.TINYINT;
 import static java.sql.Types.BIGINT;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -133,12 +131,12 @@ public class TestJdbcCommonConvertToAvro {
         when(metadata.getColumnName(1)).thenReturn("t_int");
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final int ret = 0;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
index 499127b..e44024a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
index 2c3eb58..37af3ac 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertNotNull;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
index c4f6071..5f594df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertNotNull;
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index b2c8f51..ccec552 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -32,6 +32,8 @@
         <module>nifi-processor-utils</module>
         <module>nifi-reporting-utils</module>
         <module>nifi-syslog-utils</module>
+        <module>nifi-database-utils</module>
+        <module>nifi-database-test-utils</module>
     </modules>
 
 </project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 7c1a13c..fb10f6a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -344,6 +344,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-database-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-standard-web-test-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <scope>test</scope>
@@ -354,6 +359,12 @@
             <version>2.0.1</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-database-test-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index e013a5c..212febc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -33,8 +33,8 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.nio.charset.Charset;
 import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 6b166d9..1df0ae2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -37,8 +37,8 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index cfdef29..f058b77 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -39,13 +39,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.util.db.JdbcCommon;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.util.db.AvroUtil.CodecType;
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 80d33c0..897a929 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -32,8 +32,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,7 +41,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index b9686b2..96a8d3e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
     @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
     @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
 })
-@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
+@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "database", "db", "logs", "convert", "filter"})
 @CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, "
     + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
     + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), "
@@ -87,7 +87,8 @@ import java.util.stream.Collectors;
     + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.")
 @DynamicProperty(name = "Value To Lookup", value = "Valid Record Path", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
                     description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
-@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"})
+@SeeAlso(value = {ConvertRecord.class, SplitRecord.class},
+        classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"})
 public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> {
 
     private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 8834821..6a4e3a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -51,8 +51,8 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
 import org.apache.nifi.processor.util.pattern.PutGroup;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processor.util.pattern.RoutingResult;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.InputStream;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1089370..b8cc75c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -35,7 +35,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,10 +43,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
 
 
 @TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 4464842..371d225 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -35,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,7 +44,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
 
 
 @TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
index 574aca7..d5b51c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -19,8 +19,7 @@ package org.apache.nifi.processors.standard.sql;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -29,20 +28,23 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
 public class DefaultAvroSqlWriter implements SqlWriter {
 
-    private final JdbcCommon.AvroConversionOptions options;
+    private final AvroConversionOptions options;
 
     private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
         put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
     }};
 
-    public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+    public DefaultAvroSqlWriter(AvroConversionOptions options) {
         this.options = options;
     }
 
     @Override
-    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
         try {
             return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
         } catch (SQLException e) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index c1a76b4..d5d798b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -22,8 +22,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -32,6 +30,7 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -41,6 +40,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
 public class RecordSqlWriter implements SqlWriter {
 
     private final RecordSetWriterFactory recordSetWriterFactory;
@@ -52,7 +54,7 @@ public class RecordSqlWriter implements SqlWriter {
     private RecordSchema writeSchema;
     private String mimeType;
 
-    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
+    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
         this.recordSetWriterFactory = recordSetWriterFactory;
         this.writeResultRef = new AtomicReference<>();
         this.maxRowsPerFlowFile = maxRowsPerFlowFile;
@@ -61,7 +63,7 @@ public class RecordSqlWriter implements SqlWriter {
     }
 
     @Override
-    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
         final RecordSet recordSet;
         try {
             if (fullRecordSet == null) {
@@ -129,9 +131,9 @@ public class RecordSqlWriter implements SqlWriter {
 
     private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
 
-        private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
+        private final ResultSetRowCallback callback;
 
-        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
+        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, ResultSetRowCallback callback) throws SQLException {
             super(rs, readerSchema);
             this.callback = callback;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
index 08fc3fd..abbe842 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard.sql;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -26,6 +25,9 @@ import java.sql.ResultSet;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
+
 /**
  * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
  * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
@@ -42,7 +44,7 @@ public interface SqlWriter {
      * @return the number of rows written to the output stream
      * @throws Exception if any errors occur during the writing of the result set to the output stream
      */
-    long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
+    long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception;
 
     /**
      * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
new file mode 100644
index 0000000..4683cbc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.db.JdbcCommon;
+
+public class JdbcProperties {
+
+    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
+            .name("dbf-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+                    + "will be changed to underscores in order to build a valid Avro record.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+            .name("dbf-user-logical-types")
+            .displayName("Use Avro Logical Types")
+            .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
+                    + "If disabled, written as string. "
+                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
+                    + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
+                    + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
+                    + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
+                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
+            .name("dbf-default-precision")
+            .displayName("Default Decimal Precision")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'precision' denoting number of available digits is required."
+                    + " Generally, precision is defined by column data type definition or database engines default."
+                    + " However undefined precision (0) can be returned from some database engines."
+                    + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
+            .defaultValue(String.valueOf(JdbcCommon.DEFAULT_PRECISION_VALUE))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
+            .name("dbf-default-scale")
+            .displayName("Default Decimal Scale")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'scale' denoting number of available decimal digits is required."
+                    + " Generally, scale is defined by column data type definition or database engines default."
+                    + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+                    + " 'Default Decimal Scale' is used when writing those undefined numbers."
+                    + " If a value has more decimals than specified scale, then the value will be rounded-up,"
+                    + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
+            .defaultValue(String.valueOf(JdbcCommon.DEFAULT_SCALE_VALUE))
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 5458434..9961c6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -46,12 +46,12 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.AvroUtil;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.AvroUtil;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -540,7 +540,7 @@ public class TestExecuteSQL {
 
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
-        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
         // ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 6f6a091..375cb98 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -28,7 +28,6 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -36,6 +35,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -492,7 +492,7 @@ public class TestExecuteSQLRecord {
 
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
-        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
         // ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
index 2429ca4..ee3ba71 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -83,9 +83,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
+            <artifactId>nifi-database-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -105,6 +104,24 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.burgstaller</groupId>
+            <artifactId>okhttp-digest</artifactId>
+            <version>1.18</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock-record-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <scope>test</scope>
@@ -141,10 +158,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.burgstaller</groupId>
-            <artifactId>okhttp-digest</artifactId>
-            <version>1.18</version>
-            <scope>compile</scope>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <build>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
new file mode 100644
index 0000000..e91e6d7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AbstractDatabaseLookupService extends AbstractControllerService {
+
+    static final String KEY = "key";
+
+    static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+    static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-dbcp-service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-table-name")
+            .displayName("Table Name")
+            .description("The name of the database table to be queried. Note that this may be case-sensitive depending on the database.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-key-column")
+            .displayName("Lookup Key Column")
+            .description("The column in the table that will serve as the lookup key. This is the column that will be matched against "
+                    + "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-cache-size")
+            .displayName("Cache Size")
+            .description("Specifies how many lookup values/records should be cached. The cache is shared for all tables and keeps a map of lookup values to records. "
+                    + "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup "
+                    + "table changes often or the most recent data must be retrieved, do not use the cache.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("0")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CLEAR_CACHE_ON_ENABLED = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-clear-cache-on-enabled")
+            .displayName("Clear Cache on Enabled")
+            .description("Whether to clear the cache when this service is enabled. If the Cache Size is zero then this property is ignored. Clearing the cache when the "
+                    + "service is enabled ensures that the service will first go to the database to get the most recent data.")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
+            .name("Cache Expiration")
+            .description("Time interval to clear all cache entries. If the Cache Size is zero then this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected List<PropertyDescriptor> properties;
+
+    DBCPService dbcpService;
+
+    volatile String lookupKeyColumn;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
new file mode 100644
index 0000000..fdb1452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.db.JdbcCommon;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, "
+        + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row "
+        + "will be returned for each lookup, duplicate database entries are ignored.")
+public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
+
+    private volatile Cache<Tuple<String, Object>, Record> cache;
+    private volatile JdbcCommon.AvroConversionOptions options;
+
+    static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-value-columns")
+            .displayName("Lookup Value Columns")
+            .description("A comma-delimited list of columns in the table that will be returned when the lookup key matches. Note that this may be case-sensitive depending on the database.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DBCP_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(LOOKUP_VALUE_COLUMNS);
+        properties.add(CACHE_SIZE);
+        properties.add(CLEAR_CACHE_ON_ENABLED);
+        properties.add(CACHE_EXPIRATION);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+        final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+        final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+        if (this.cache == null || (cacheSize > 0 && clearCache)) {
+            if (durationNanos > 0) {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .expireAfter(new Expiry<Tuple<String, Object>, Record>() {
+                            @Override
+                            public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime) {
+                                return durationNanos;
+                            }
+
+                            @Override
+                            public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+
+                            @Override
+                            public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+                        })
+                        .build();
+            } else {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .build();
+            }
+        }
+
+        options = JdbcCommon.AvroConversionOptions.builder()
+                .recordName("NiFi_DB_Record_Lookup")
+                // Ignore duplicates
+                .maxRows(1)
+                // Keep column names as field names
+                .convertNames(false)
+                .useLogicalTypes(true)
+                .build();
+    }
+
+    @Override
+    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        return lookup(coordinates, null);
+    }
+
+    @Override
+    public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final Object key = coordinates.get(KEY);
+        if (StringUtils.isBlank(key.toString())) {
+            return Optional.empty();
+        }
+
+        final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+        final String lookupValueColumnsList = getProperty(LOOKUP_VALUE_COLUMNS).evaluateAttributeExpressions(context).getValue();
+
+        Set<String> lookupValueColumnsSet = new LinkedHashSet<>();
+        if (lookupValueColumnsList != null) {
+            Stream.of(lookupValueColumnsList)
+                    .flatMap(path -> Arrays.stream(path.split(",")))
+                    .filter(DatabaseRecordLookupService::isNotBlank)
+                    .map(String::trim)
+                    .forEach(lookupValueColumnsSet::add);
+        }
+
+        final String lookupValueColumns = lookupValueColumnsSet.isEmpty() ? "*" : String.join(",", lookupValueColumnsSet);
+
+        Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+        // Not using the function param of cache.get so we can catch and handle the checked exceptions
+        Record foundRecord = cache.get(cacheLookupKey, k -> null);
+
+        if (foundRecord == null) {
+            final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+            try (final Connection con = dbcpService.getConnection(context);
+                 final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+                st.setObject(1, key);
+                ResultSet resultSet = st.executeQuery();
+                final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
+                final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
+                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+                foundRecord = resultSetRecordSet.next();
+
+                // Populate the cache if the record is present
+                if (foundRecord != null) {
+                    cache.put(cacheLookupKey, foundRecord);
+                }
+
+            } catch (SQLException se) {
+                throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+            } catch (IOException ioe) {
+                throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe);
+            }
+        }
+
+        return Optional.ofNullable(foundRecord);
+    }
+
+    private static boolean isNotBlank(final String value) {
+        return value != null && !value.trim().isEmpty();
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
new file mode 100644
index 0000000..f649582
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " +
+        "the specified lookup value column is returned. Only one value will be returned for each lookup, duplicate database entries are ignored.")
+public class SimpleDatabaseLookupService extends AbstractDatabaseLookupService implements StringLookupService {
+
+    private volatile Cache<Tuple<String, Object>, String> cache;
+
+    static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
+            new PropertyDescriptor.Builder()
+                    .name("lookup-value-column")
+                    .displayName("Lookup Value Column")
+                    .description("The column whose value will be returned when the Lookup value is matched")
+                    .required(true)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                    .build();
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DBCP_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(LOOKUP_VALUE_COLUMN);
+        properties.add(CACHE_SIZE);
+        properties.add(CLEAR_CACHE_ON_ENABLED);
+        properties.add(CACHE_EXPIRATION);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+        int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+        final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+        if (this.cache == null || (cacheSize > 0 && clearCache)) {
+            if (durationNanos > 0) {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .expireAfter(new Expiry<Tuple<String, Object>, Object>() {
+                            @Override
+                            public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime) {
+                                return durationNanos;
+                            }
+
+                            @Override
+                            public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+
+                            @Override
+                            public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+                        })
+                        .build();
+            } else {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .build();
+            }
+        }
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        return lookup(coordinates, null);
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final Object key = coordinates.get(KEY);
+        if (StringUtils.isBlank(key.toString())) {
+            return Optional.empty();
+        }
+
+        final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+        final String lookupValueColumn = getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions(context).getValue();
+
+        Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+        // Not using the function param of cache.get so we can catch and handle the checked exceptions
+        String foundRecord = cache.get(cacheLookupKey, k -> null);
+
+        if (foundRecord == null) {
+            final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+            try (final Connection con = dbcpService.getConnection(context);
+                 final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+                st.setObject(1, key);
+                ResultSet resultSet = st.executeQuery();
+
+                if (!resultSet.next()) {
+                    return Optional.empty();
+                }
+
+                Object o = resultSet.getObject(lookupValueColumn);
+                if (o == null) {
+                    return Optional.empty();
+                }
+                foundRecord = o.toString();
+
+                // Populate the cache if the record is present
+                if (foundRecord != null) {
+                    cache.put(cacheLookupKey, foundRecord);
+                }
+
+            } catch (SQLException se) {
+                throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+            }
+        }
+
+        return Optional.ofNullable(foundRecord);
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 631fdaa..06d7622 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,9 +14,11 @@
 # limitations under the License.
 org.apache.nifi.lookup.maxmind.IPLookupService
 org.apache.nifi.lookup.CSVRecordLookupService
+org.apache.nifi.lookup.db.DatabaseRecordLookupService
 org.apache.nifi.lookup.PropertiesFileLookupService
 org.apache.nifi.lookup.RestLookupService
 org.apache.nifi.lookup.SimpleKeyValueLookupService
 org.apache.nifi.lookup.SimpleCsvFileLookupService
+org.apache.nifi.lookup.db.SimpleDatabaseLookupService
 org.apache.nifi.lookup.XMLFileLookupService
 org.apache.nifi.lookup.DistributedMapCacheLookupService
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
new file mode 100644
index 0000000..860a90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertNull
+import static org.junit.Assert.assertThat
+
+
+class TestDatabaseRecordLookupService {
+
+    private TestRunner runner
+
+    private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+    private final static String DB_LOCATION = "target/db"
+
+    @BeforeClass
+    static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log")
+    }
+
+    @Before
+    void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl()
+        final Map<String, String> dbcpProperties = new HashMap<>()
+
+        runner = TestRunners.newTestRunner(TestProcessor.class)
+        runner.addControllerService("dbcp", dbcp, dbcpProperties)
+        runner.enableControllerService(dbcp)
+    }
+
+    @Test
+    void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+        assertEquals("Hello", property1.get().getAsString("VAL2"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+        assertEquals("World", property2.get().getAsString("VAL2"))
+
+        // Key not found
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+    }
+
+    @Test
+    void testDatabaseLookupServiceSpecifyColumns() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, "val1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+
+        // Key not found
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+    }
+
+    @Test
+    void exerciseCacheLogic() {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, DatabaseRecordLookupService.CACHE_SIZE, "10")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property1.get().getAsInt("VAL1"))
+        assertEquals("World", property1.get().getAsString("VAL2"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+        assertEquals("World", property2.get().getAsString("VAL2"))
+
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull(property3.get().getAsInt("VAL1"))
+        assertEquals("Hello", property3.get().getAsString("VAL2"))
+
+        final Optional<Record> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull(property4.get().getAsInt("VAL1"))
+        assertEquals("Hello", property4.get().getAsString("VAL2"))
+    }
+
+    /**
+     * Simple implementation for component testing.
+     *
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        String getIdentifier() {
+            "dbcp"
+        }
+
+        @Override
+        Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+                DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+            } catch (e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
new file mode 100644
index 0000000..e255771
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.*
+
+class TestSimpleDatabaseLookupService {
+
+    private TestRunner runner
+
+    private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+    private final static String DB_LOCATION = "target/db"
+
+    @BeforeClass
+    static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log")
+    }
+
+    @Before
+    void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl()
+        final Map<String, String> dbcpProperties = new HashMap<>()
+
+        runner = TestRunners.newTestRunner(TestProcessor.class)
+        runner.addControllerService("dbcp", dbcp, dbcpProperties)
+        runner.enableControllerService(dbcp)
+    }
+
+    @Test
+    void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        // Lookup VAL1
+        final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertFalse(property1.isPresent())
+        // Key not found
+        final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+
+        runner.disableControllerService(service)
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+        runner.enableControllerService(service)
+        final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("World", property2.get())
+    }
+
+    @Test
+    void exerciseCacheLogic() {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, SimpleDatabaseLookupService.CACHE_SIZE, "10")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        // Lookup VAL1
+        final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("1", property1.get())
+        final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertFalse(property3.isPresent())
+
+
+        runner.disableControllerService(service)
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+        runner.enableControllerService(service)
+        final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("World", property2.get())
+
+        final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertEquals("Hello", property4.get())
+    }
+
+    /**
+     * Simple implementation for component testing.
+     *
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        String getIdentifier() {
+            "dbcp"
+        }
+
+        @Override
+        Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+                DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+            } catch (e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file


[nifi] 02/02: NIFI-6082: Refactor the way to handle fields nullable

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4db5446c878a9be1d621429686f2835dc642d550
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 16:50:01 2019 +0900

    NIFI-6082: Refactor the way to handle fields nullable
    
    - Make enriched fields nullable at LookupRecord.
    - Removed unnecessary AvroConversionOptions and reader schema creation,
    because ResultSetRecordSet can generate NiFi Record Schema from RS
    directly. No Avro schema is needed to do that.
---
 .../serialization/record/ResultSetRecordSet.java   | 18 ++--------
 .../nifi/processors/standard/LookupRecord.java     | 11 ++++--
 .../nifi/processors/standard/TestLookupRecord.java | 40 ++++++++++++++++++++++
 .../lookup/db/DatabaseRecordLookupService.java     | 18 +---------
 4 files changed, 53 insertions(+), 34 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index fc3d60f..ee47c63 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,21 +55,9 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
-        this(rs, readerSchema, false);
-    }
-
-    /**
-     * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
-     *
-     * @param rs The underlying ResultSet for this RecordSet
-     * @param readerSchema The schema to which this RecordSet adheres
-     * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
-     * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
-     */
-    public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
-        this.schema = createSchema(rs, readerSchema, allFieldsNullable);
+        this.schema = createSchema(rs, readerSchema);
 
         rsColumnNames = new HashSet<>();
         final ResultSetMetaData metadata = rs.getMetaData();
@@ -152,7 +140,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return value;
     }
 
-    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
+    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -166,7 +154,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
             final int nullableFlag = metadata.isNullable(column);
             final boolean nullable;
-            if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
+            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
                 nullable = false;
             } else {
                 nullable = true;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 96a8d3e..23d1325 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -316,7 +316,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
             if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
                 final Record lookupRecord = (Record) lookupValue;
 
-                // Use wants to add all fields of the resultant Record to the specified Record Path.
+                // User wants to add all fields of the resultant Record to the specified Record Path.
                 // If the destination Record Path returns to us a Record, then we will add all field values of
                 // the Lookup Record to the destination Record. However, if the destination Record Path returns
                 // something other than a Record, then we can't add the fields to it. We can only replace it,
@@ -332,7 +332,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
 
                             final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
                             if (recordFieldOption.isPresent()) {
-                                destinationRecord.setValue(recordFieldOption.get(), value);
+                                // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
+                                // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
+                                // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
+                                RecordField field = recordFieldOption.get();
+                                if (!routeToMatchedUnmatched && !field.isNullable()) {
+                                    field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
+                                }
+                                destinationRecord.setValue(field, value);
                             } else {
                                 destinationRecord.setValue(fieldName, value);
                             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 3efd9d1..f8fb158 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -395,6 +395,46 @@ public class TestLookupRecord {
             || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
     }
 
+    @Test
+    public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
+
+        // Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable.
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        // Incoming Record doesn't have the fields to be enriched.
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+
+        recordReader.addRecord("John Doe", 48);
+        recordReader.addRecord("Jane Doe", 47);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
+        out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
+    }
 
     private static class MapLookup extends AbstractControllerService implements StringLookupService {
         private final Map<String, String> values = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
index fdb1452..b176b33 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -19,12 +19,10 @@ package org.apache.nifi.lookup.db;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Expiry;
-import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
@@ -34,10 +32,8 @@ import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.Tuple;
-import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -62,7 +58,6 @@ import java.util.stream.Stream;
 public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
 
     private volatile Cache<Tuple<String, Object>, Record> cache;
-    private volatile JdbcCommon.AvroConversionOptions options;
 
     static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
             .name("dbrecord-lookup-value-columns")
@@ -120,15 +115,6 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
                         .build();
             }
         }
-
-        options = JdbcCommon.AvroConversionOptions.builder()
-                .recordName("NiFi_DB_Record_Lookup")
-                // Ignore duplicates
-                .maxRows(1)
-                // Keep column names as field names
-                .convertNames(false)
-                .useLogicalTypes(true)
-                .build();
     }
 
     @Override
@@ -173,9 +159,7 @@ public class DatabaseRecordLookupService extends AbstractDatabaseLookupService i
 
                 st.setObject(1, key);
                 ResultSet resultSet = st.executeQuery();
-                final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
-                final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
-                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, null);
                 foundRecord = resultSetRecordSet.next();
 
                 // Populate the cache if the record is present