You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/14 00:34:30 UTC

[nifi] 01/02: NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ca76fe178cfae8890b347081260cd59a62321219
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Feb 27 16:17:46 2019 -0500

    NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils
    
    NIFI-6082: Added SimpleDatabaseLookupService
    
    NIFI-6082: Merged Koji's improvements, incorporated review comments
    
    This closes #3341.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../serialization/record/ResultSetRecordSet.java   |  18 +-
 .../{ => nifi-database-test-utils}/pom.xml         |  26 +--
 .../apache/nifi/util/db/SimpleCommerceDataSet.java | 112 ++++++++++
 .../nifi-database-utils/pom.xml                    | 101 +++++++++
 .../java/org/apache/nifi/util/db}/AvroUtil.java    |   2 +-
 .../java/org/apache/nifi/util/db}/JdbcCommon.java  |  71 +------
 .../apache/nifi/util/db}/JdbcCommonTestUtils.java  |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcCommon.java   |  18 +-
 .../nifi/util/db}/TestJdbcCommonConvertToAvro.java |   8 +-
 .../apache/nifi/util/db}/TestJdbcHugeStream.java   |   2 +-
 .../apache/nifi/util/db}/TestJdbcTypesDerby.java   |   2 +-
 .../org/apache/nifi/util/db}/TestJdbcTypesH2.java  |   2 +-
 nifi-nar-bundles/nifi-extension-utils/pom.xml      |   2 +
 .../nifi-standard-processors/pom.xml               |  11 +
 .../processors/standard/AbstractExecuteSQL.java    |   2 +-
 .../standard/AbstractQueryDatabaseTable.java       |   2 +-
 .../nifi/processors/standard/ExecuteSQL.java       |  12 +-
 .../nifi/processors/standard/ExecuteSQLRecord.java |   5 +-
 .../nifi/processors/standard/LookupRecord.java     |   5 +-
 .../apache/nifi/processors/standard/PutSQL.java    |   2 +-
 .../processors/standard/QueryDatabaseTable.java    |  10 +-
 .../standard/QueryDatabaseTableRecord.java         |   4 +-
 .../standard/sql/DefaultAvroSqlWriter.java         |  12 +-
 .../processors/standard/sql/RecordSqlWriter.java   |  14 +-
 .../nifi/processors/standard/sql/SqlWriter.java    |   6 +-
 .../processors/standard/util/JdbcProperties.java   |  81 ++++++++
 .../nifi/processors/standard/TestExecuteSQL.java   |   6 +-
 .../processors/standard/TestExecuteSQLRecord.java  |   4 +-
 .../nifi-lookup-services/pom.xml                   |  29 ++-
 .../lookup/db/AbstractDatabaseLookupService.java   | 104 ++++++++++
 .../lookup/db/DatabaseRecordLookupService.java     | 206 ++++++++++++++++++
 .../lookup/db/SimpleDatabaseLookupService.java     | 174 ++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   2 +
 .../db/TestDatabaseRecordLookupService.groovy      | 229 +++++++++++++++++++++
 .../db/TestSimpleDatabaseLookupService.groovy      | 184 +++++++++++++++++
 35 files changed, 1322 insertions(+), 148 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index ee47c63..fc3d60f 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -55,9 +55,21 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final String FLOAT_CLASS_NAME = Float.class.getName();
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+        this(rs, readerSchema, false);
+    }
+
+    /**
+     * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema
+     *
+     * @param rs The underlying ResultSet for this RecordSet
+     * @param readerSchema The schema to which this RecordSet adheres
+     * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable.
+     * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata
+     */
+    public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
-        this.schema = createSchema(rs, readerSchema);
+        this.schema = createSchema(rs, readerSchema, allFieldsNullable);
 
         rsColumnNames = new HashSet<>();
         final ResultSetMetaData metadata = rs.getMetaData();
@@ -140,7 +152,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return value;
     }
 
-    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException {
+    private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -154,7 +166,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
             final int nullableFlag = metadata.isNullable(column);
             final boolean nullable;
-            if (nullableFlag == ResultSetMetaData.columnNoNulls) {
+            if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) {
                 nullable = false;
             } else {
                 nullable = true;
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
similarity index 55%
copy from nifi-nar-bundles/nifi-extension-utils/pom.xml
copy to nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
index b2c8f51..cf63b5e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -13,25 +13,19 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-bundles</artifactId>
+        <artifactId>nifi-extension-utils</artifactId>
         <version>1.10.0-SNAPSHOT</version>
     </parent>
-    <packaging>pom</packaging>
-    <artifactId>nifi-extension-utils</artifactId>
-    <description>
-        This module contains reusable utilities related to extensions that can be shared across NARs.
-    </description>
 
-    <modules>
-        <module>nifi-record-utils</module>
-        <module>nifi-hadoop-utils</module>
-        <module>nifi-processor-utils</module>
-        <module>nifi-reporting-utils</module>
-        <module>nifi-syslog-utils</module>
-    </modules>
+    <artifactId>nifi-database-test-utils</artifactId>
 
-</project>
+    <dependencies>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
new file mode 100644
index 0000000..89c2600
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+
+/**
+ * A sample data set for test consists of 'persons', 'products' and 'relationships' tables.
+ */
+public class SimpleCommerceDataSet {
+
+    static String dropPersons = "drop table persons";
+    static String dropProducts = "drop table products";
+    static String dropRelationships = "drop table relationships";
+    static String createPersons = "create table persons (id integer, name varchar(100), code integer)";
+    static String createProducts = "create table products (id integer, name varchar(100), code integer)";
+    static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)";
+
+    public static void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws SQLException {
+
+        System.out.println(createRandomName());
+        System.out.println(createRandomName());
+        System.out.println(createRandomName());
+
+        final Statement st = con.createStatement();
+
+        // tables may not exist, this is not serious problem.
+        try {
+            st.executeUpdate(dropPersons);
+        } catch (final Exception ignored) {
+        }
+
+        try {
+            st.executeUpdate(dropProducts);
+        } catch (final Exception ignored) {
+        }
+
+        try {
+            st.executeUpdate(dropRelationships);
+        } catch (final Exception ignored) {
+        }
+
+        st.executeUpdate(createPersons);
+        st.executeUpdate(createProducts);
+        st.executeUpdate(createRelationships);
+
+        for (int i = 0; i < nrOfPersons; i++)
+            loadPersons(st, i);
+
+        for (int i = 0; i < nrOfProducts; i++)
+            loadProducts(st, i);
+
+        for (int i = 0; i < nrOfRels; i++)
+            loadRelationships(st, i);
+
+        st.close();
+    }
+
+    static Random rng = new Random(53495);
+
+    static private void loadPersons(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private void loadProducts(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private void loadRelationships(Statement st, int nr) throws SQLException {
+        st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
+    }
+
+    static private String createRandomName() {
+        return createRandomString() + " " + createRandomString();
+    }
+
+    static private String createRandomString() {
+
+        final int length = rng.nextInt(10);
+        final String characters = "ABCDEFGHIJ";
+
+        final char[] text = new char[length];
+        for (int i = 0; i < length; i++) {
+            text[i] = characters.charAt(rng.nextInt(characters.length()));
+        }
+        return new String(text);
+    }
+
+    private Connection createConnection(String location) throws ClassNotFoundException, SQLException {
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        return DriverManager.getConnection("jdbc:derby:" + location + ";create=true");
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
new file mode 100644
index 0000000..3cc62e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-database-utils</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.8.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        <!-- Other modules using nifi-database-utils are expected to have these APIs available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.187</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/org/apache/nifi/avro/data.avro</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/schema.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/simpleSchema.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue1.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue2.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords1.json</exclude>
+                        <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
index 970c7c2..8bb2261 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.apache.avro.file.CodecFactory;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
similarity index 90%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index 3de86c7..e41b3cb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static java.sql.Types.ARRAY;
 import static java.sql.Types.BIGINT;
@@ -100,12 +100,9 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.util.StandardValidators;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -114,11 +111,11 @@ import javax.xml.bind.DatatypeConverter;
  */
 public class JdbcCommon {
 
-    private static final int MAX_DIGITS_IN_BIGINT = 19;
-    private static final int MAX_DIGITS_IN_INT = 9;
+    public static final int MAX_DIGITS_IN_BIGINT = 19;
+    public static final int MAX_DIGITS_IN_INT = 9;
     // Derived from MySQL default precision.
-    private static final int DEFAULT_PRECISION_VALUE = 10;
-    private static final int DEFAULT_SCALE_VALUE = 0;
+    public static final int DEFAULT_PRECISION_VALUE = 10;
+    public static final int DEFAULT_SCALE_VALUE = 0;
 
     public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$");
     public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
@@ -126,62 +123,6 @@ public class JdbcCommon {
 
     public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
 
-    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
-            .name("dbf-normalize")
-            .displayName("Normalize Table/Column Names")
-            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
-                    + "will be changed to underscores in order to build a valid Avro record.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
-            .name("dbf-user-logical-types")
-            .displayName("Use Avro Logical Types")
-            .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
-                    + "If disabled, written as string. "
-                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
-                    + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
-                    + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
-                    + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
-                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
-                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
-            .name("dbf-default-precision")
-            .displayName("Default Decimal Precision")
-            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
-                    + " a specific 'precision' denoting number of available digits is required."
-                    + " Generally, precision is defined by column data type definition or database engines default."
-                    + " However undefined precision (0) can be returned from some database engines."
-                    + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
-            .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
-            .name("dbf-default-scale")
-            .displayName("Default Decimal Scale")
-            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
-                    + " a specific 'scale' denoting number of available decimal digits is required."
-                    + " Generally, scale is defined by column data type definition or database engines default."
-                    + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
-                    + " 'Default Decimal Scale' is used when writing those undefined numbers."
-                    + " If a value has more decimals than specified scale, then the value will be rounded-up,"
-                    + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
-            .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .required(true)
-            .build();
-
     public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
         return convertToAvroStream(rs, outStream, null, null, convertNames);
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
index ad57158..cc8d29e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
similarity index 97%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
index 9cf4fc1..fa584c0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -427,7 +425,7 @@ public class TestJdbcCommon {
         when(metadata.getPrecision(1)).thenReturn(dbPrecision);
         when(metadata.getScale(1)).thenReturn(expectedScale);
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
 
@@ -580,12 +578,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn("t_int");
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final short s = 25;
         when(rs.getObject(Mockito.anyInt())).thenReturn(s);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -608,12 +606,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn(mockColumnName);
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final Long ret = 0L;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
@@ -636,12 +634,12 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn(mockColumnName);
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final Long ret = 0L;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
similarity index 92%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
index eb736e2..e6f9743 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
@@ -40,8 +40,6 @@ import static java.sql.Types.INTEGER;
 import static java.sql.Types.SMALLINT;
 import static java.sql.Types.TINYINT;
 import static java.sql.Types.BIGINT;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
-import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -133,12 +131,12 @@ public class TestJdbcCommonConvertToAvro {
         when(metadata.getColumnName(1)).thenReturn("t_int");
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = resultSetReturningMetadata(metadata);
+        final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata);
 
         final int ret = 0;
         when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
 
-        final InputStream instream = convertResultSetToAvroInputStream(rs);
+        final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
index 499127b..e44024a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
index 2c3eb58..37af3ac 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertNotNull;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
similarity index 99%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
index c4f6071..5f594df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.standard.util;
+package org.apache.nifi.util.db;
 
 import static org.junit.Assert.assertNotNull;
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index b2c8f51..ccec552 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -32,6 +32,8 @@
         <module>nifi-processor-utils</module>
         <module>nifi-reporting-utils</module>
         <module>nifi-syslog-utils</module>
+        <module>nifi-database-utils</module>
+        <module>nifi-database-test-utils</module>
     </modules>
 
 </project>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 7c1a13c..fb10f6a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -344,6 +344,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-database-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-standard-web-test-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <scope>test</scope>
@@ -354,6 +359,12 @@
             <version>2.0.1</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-database-test-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index e013a5c..212febc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -33,8 +33,8 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.nio.charset.Charset;
 import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index 6b166d9..1df0ae2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -37,8 +37,8 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.sql.Connection;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index cfdef29..f058b77 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -39,13 +39,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.util.db.JdbcCommon;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.util.db.AvroUtil.CodecType;
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 80d33c0..897a929 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -32,8 +32,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,7 +41,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
+
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index b9686b2..96a8d3e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -73,7 +73,7 @@ import java.util.stream.Collectors;
     @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
     @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
 })
-@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
+@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "database", "db", "logs", "convert", "filter"})
 @CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, "
     + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
     + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), "
@@ -87,7 +87,8 @@ import java.util.stream.Collectors;
     + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.")
 @DynamicProperty(name = "Value To Lookup", value = "Valid Record Path", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
                     description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
-@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"})
+@SeeAlso(value = {ConvertRecord.class, SplitRecord.class},
+        classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"})
 public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> {
 
     private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 8834821..6a4e3a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -51,8 +51,8 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup;
 import org.apache.nifi.processor.util.pattern.PutGroup;
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
 import org.apache.nifi.processor.util.pattern.RoutingResult;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.InputStream;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1089370..b8cc75c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -35,7 +35,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,10 +43,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
 
 
 @TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 4464842..371d225 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -35,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.processors.standard.sql.SqlWriter;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,7 +44,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
 
 
 @TriggerSerially
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
index 574aca7..d5b51c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -19,8 +19,7 @@ package org.apache.nifi.processors.standard.sql;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -29,20 +28,23 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
 public class DefaultAvroSqlWriter implements SqlWriter {
 
-    private final JdbcCommon.AvroConversionOptions options;
+    private final AvroConversionOptions options;
 
     private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
         put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
     }};
 
-    public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+    public DefaultAvroSqlWriter(AvroConversionOptions options) {
         this.options = options;
     }
 
     @Override
-    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
         try {
             return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
         } catch (SQLException e) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index c1a76b4..d5d798b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -22,8 +22,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
-import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -32,6 +30,7 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.db.JdbcCommon;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -41,6 +40,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions;
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
 public class RecordSqlWriter implements SqlWriter {
 
     private final RecordSetWriterFactory recordSetWriterFactory;
@@ -52,7 +54,7 @@ public class RecordSqlWriter implements SqlWriter {
     private RecordSchema writeSchema;
     private String mimeType;
 
-    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
+    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
         this.recordSetWriterFactory = recordSetWriterFactory;
         this.writeResultRef = new AtomicReference<>();
         this.maxRowsPerFlowFile = maxRowsPerFlowFile;
@@ -61,7 +63,7 @@ public class RecordSqlWriter implements SqlWriter {
     }
 
     @Override
-    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception {
         final RecordSet recordSet;
         try {
             if (fullRecordSet == null) {
@@ -129,9 +131,9 @@ public class RecordSqlWriter implements SqlWriter {
 
     private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
 
-        private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
+        private final ResultSetRowCallback callback;
 
-        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
+        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, ResultSetRowCallback callback) throws SQLException {
             super(rs, readerSchema);
             this.callback = callback;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
index 08fc3fd..abbe842 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard.sql;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -26,6 +25,9 @@ import java.sql.ResultSet;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback;
+
+
 /**
  * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
  * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
@@ -42,7 +44,7 @@ public interface SqlWriter {
      * @return the number of rows written to the output stream
      * @throws Exception if any errors occur during the writing of the result set to the output stream
      */
-    long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
+    long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception;
 
     /**
      * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
new file mode 100644
index 0000000..4683cbc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.db.JdbcCommon;
+
+public class JdbcProperties {
+
+    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
+            .name("dbf-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+                    + "will be changed to underscores in order to build a valid Avro record.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+            .name("dbf-user-logical-types")
+            .displayName("Use Avro Logical Types")
+            .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
+                    + "If disabled, written as string. "
+                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
+                    + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
+                    + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
+                    + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
+                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
+            .name("dbf-default-precision")
+            .displayName("Default Decimal Precision")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'precision' denoting number of available digits is required."
+                    + " Generally, precision is defined by column data type definition or database engines default."
+                    + " However undefined precision (0) can be returned from some database engines."
+                    + " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
+            .defaultValue(String.valueOf(JdbcCommon.DEFAULT_PRECISION_VALUE))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
+            .name("dbf-default-scale")
+            .displayName("Default Decimal Scale")
+            .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+                    + " a specific 'scale' denoting number of available decimal digits is required."
+                    + " Generally, scale is defined by column data type definition or database engines default."
+                    + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+                    + " 'Default Decimal Scale' is used when writing those undefined numbers."
+                    + " If a value has more decimals than specified scale, then the value will be rounded-up,"
+                    + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
+            .defaultValue(String.valueOf(JdbcCommon.DEFAULT_SCALE_VALUE))
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 5458434..9961c6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -46,12 +46,12 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.AvroUtil;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.AvroUtil;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -540,7 +540,7 @@ public class TestExecuteSQL {
 
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
-        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
         // ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 6f6a091..375cb98 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -28,7 +28,6 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -36,6 +35,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.db.SimpleCommerceDataSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -492,7 +492,7 @@ public class TestExecuteSQLRecord {
 
         // load test data to database
         final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
-        TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
+        SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
         // ResultSet size will be 1x200x100 = 20 000 rows
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
index 2429ca4..ee3ba71 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -83,9 +83,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
+            <artifactId>nifi-database-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -105,6 +104,24 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service-api</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.burgstaller</groupId>
+            <artifactId>okhttp-digest</artifactId>
+            <version>1.18</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock-record-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <scope>test</scope>
@@ -141,10 +158,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.burgstaller</groupId>
-            <artifactId>okhttp-digest</artifactId>
-            <version>1.18</version>
-            <scope>compile</scope>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
     <build>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
new file mode 100644
index 0000000..e91e6d7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class AbstractDatabaseLookupService extends AbstractControllerService {
+
+    static final String KEY = "key";
+
+    static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+    static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-dbcp-service")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-table-name")
+            .displayName("Table Name")
+            .description("The name of the database table to be queried. Note that this may be case-sensitive depending on the database.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-key-column")
+            .displayName("Lookup Key Column")
+            .description("The column in the table that will serve as the lookup key. This is the column that will be matched against "
+                    + "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-cache-size")
+            .displayName("Cache Size")
+            .description("Specifies how many lookup values/records should be cached. The cache is shared for all tables and keeps a map of lookup values to records. "
+                    + "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup "
+                    + "table changes often or the most recent data must be retrieved, do not use the cache.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("0")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CLEAR_CACHE_ON_ENABLED = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-clear-cache-on-enabled")
+            .displayName("Clear Cache on Enabled")
+            .description("Whether to clear the cache when this service is enabled. If the Cache Size is zero then this property is ignored. Clearing the cache when the "
+                    + "service is enabled ensures that the service will first go to the database to get the most recent data.")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue("true")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder()
+            .name("Cache Expiration")
+            .description("Time interval to clear all cache entries. If the Cache Size is zero then this property is ignored.")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected List<PropertyDescriptor> properties;
+
+    DBCPService dbcpService;
+
+    volatile String lookupKeyColumn;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
new file mode 100644
index 0000000..fdb1452
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.db.JdbcCommon;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, "
+        + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row "
+        + "will be returned for each lookup, duplicate database entries are ignored.")
+public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService {
+
+    private volatile Cache<Tuple<String, Object>, Record> cache;
+    private volatile JdbcCommon.AvroConversionOptions options;
+
+    static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder()
+            .name("dbrecord-lookup-value-columns")
+            .displayName("Lookup Value Columns")
+            .description("A comma-delimited list of columns in the table that will be returned when the lookup key matches. Note that this may be case-sensitive depending on the database.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DBCP_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(LOOKUP_VALUE_COLUMNS);
+        properties.add(CACHE_SIZE);
+        properties.add(CLEAR_CACHE_ON_ENABLED);
+        properties.add(CACHE_EXPIRATION);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+        final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+        final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+        if (this.cache == null || (cacheSize > 0 && clearCache)) {
+            if (durationNanos > 0) {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .expireAfter(new Expiry<Tuple<String, Object>, Record>() {
+                            @Override
+                            public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime) {
+                                return durationNanos;
+                            }
+
+                            @Override
+                            public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+
+                            @Override
+                            public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+                        })
+                        .build();
+            } else {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .build();
+            }
+        }
+
+        options = JdbcCommon.AvroConversionOptions.builder()
+                .recordName("NiFi_DB_Record_Lookup")
+                // Ignore duplicates
+                .maxRows(1)
+                // Keep column names as field names
+                .convertNames(false)
+                .useLogicalTypes(true)
+                .build();
+    }
+
+    @Override
+    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        return lookup(coordinates, null);
+    }
+
+    @Override
+    public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final Object key = coordinates.get(KEY);
+        if (StringUtils.isBlank(key.toString())) {
+            return Optional.empty();
+        }
+
+        final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+        final String lookupValueColumnsList = getProperty(LOOKUP_VALUE_COLUMNS).evaluateAttributeExpressions(context).getValue();
+
+        Set<String> lookupValueColumnsSet = new LinkedHashSet<>();
+        if (lookupValueColumnsList != null) {
+            Stream.of(lookupValueColumnsList)
+                    .flatMap(path -> Arrays.stream(path.split(",")))
+                    .filter(DatabaseRecordLookupService::isNotBlank)
+                    .map(String::trim)
+                    .forEach(lookupValueColumnsSet::add);
+        }
+
+        final String lookupValueColumns = lookupValueColumnsSet.isEmpty() ? "*" : String.join(",", lookupValueColumnsSet);
+
+        Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+        // Not using the function param of cache.get so we can catch and handle the checked exceptions
+        Record foundRecord = cache.get(cacheLookupKey, k -> null);
+
+        if (foundRecord == null) {
+            final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+            try (final Connection con = dbcpService.getConnection(context);
+                 final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+                st.setObject(1, key);
+                ResultSet resultSet = st.executeQuery();
+                final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
+                final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
+                ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true);
+                foundRecord = resultSetRecordSet.next();
+
+                // Populate the cache if the record is present
+                if (foundRecord != null) {
+                    cache.put(cacheLookupKey, foundRecord);
+                }
+
+            } catch (SQLException se) {
+                throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+            } catch (IOException ioe) {
+                throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe);
+            }
+        }
+
+        return Optional.ofNullable(foundRecord);
+    }
+
+    private static boolean isNotBlank(final String value) {
+        return value != null && !value.trim().isEmpty();
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
new file mode 100644
index 0000000..f649582
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value"})
+@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " +
+        "the specified lookup value column is returned. Only one value will be returned for each lookup, duplicate database entries are ignored.")
+public class SimpleDatabaseLookupService extends AbstractDatabaseLookupService implements StringLookupService {
+
+    private volatile Cache<Tuple<String, Object>, String> cache;
+
+    static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
+            new PropertyDescriptor.Builder()
+                    .name("lookup-value-column")
+                    .displayName("Lookup Value Column")
+                    .description("The column whose value will be returned when the Lookup value is matched")
+                    .required(true)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                    .build();
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DBCP_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(LOOKUP_VALUE_COLUMN);
+        properties.add(CACHE_SIZE);
+        properties.add(CLEAR_CACHE_ON_ENABLED);
+        properties.add(CACHE_EXPIRATION);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+        int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean();
+        final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L;
+        if (this.cache == null || (cacheSize > 0 && clearCache)) {
+            if (durationNanos > 0) {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .expireAfter(new Expiry<Tuple<String, Object>, Object>() {
+                            @Override
+                            public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime) {
+                                return durationNanos;
+                            }
+
+                            @Override
+                            public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+
+                            @Override
+                            public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) {
+                                return currentDuration;
+                            }
+                        })
+                        .build();
+            } else {
+                this.cache = Caffeine.newBuilder()
+                        .maximumSize(cacheSize)
+                        .build();
+            }
+        }
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
+        return lookup(coordinates, null);
+    }
+
+    @Override
+    public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final Object key = coordinates.get(KEY);
+        if (StringUtils.isBlank(key.toString())) {
+            return Optional.empty();
+        }
+
+        final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue();
+        final String lookupValueColumn = getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions(context).getValue();
+
+        Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key);
+
+        // Not using the function param of cache.get so we can catch and handle the checked exceptions
+        String foundRecord = cache.get(cacheLookupKey, k -> null);
+
+        if (foundRecord == null) {
+            final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?";
+            try (final Connection con = dbcpService.getConnection(context);
+                 final PreparedStatement st = con.prepareStatement(selectQuery)) {
+
+                st.setObject(1, key);
+                ResultSet resultSet = st.executeQuery();
+
+                if (!resultSet.next()) {
+                    return Optional.empty();
+                }
+
+                Object o = resultSet.getObject(lookupValueColumn);
+                if (o == null) {
+                    return Optional.empty();
+                }
+                foundRecord = o.toString();
+
+                // Populate the cache if the record is present
+                if (foundRecord != null) {
+                    cache.put(cacheLookupKey, foundRecord);
+                }
+
+            } catch (SQLException se) {
+                throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString()
+                        + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se);
+            }
+        }
+
+        return Optional.ofNullable(foundRecord);
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 631fdaa..06d7622 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,9 +14,11 @@
 # limitations under the License.
 org.apache.nifi.lookup.maxmind.IPLookupService
 org.apache.nifi.lookup.CSVRecordLookupService
+org.apache.nifi.lookup.db.DatabaseRecordLookupService
 org.apache.nifi.lookup.PropertiesFileLookupService
 org.apache.nifi.lookup.RestLookupService
 org.apache.nifi.lookup.SimpleKeyValueLookupService
 org.apache.nifi.lookup.SimpleCsvFileLookupService
+org.apache.nifi.lookup.db.SimpleDatabaseLookupService
 org.apache.nifi.lookup.XMLFileLookupService
 org.apache.nifi.lookup.DistributedMapCacheLookupService
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
new file mode 100644
index 0000000..860a90d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertNull
+import static org.junit.Assert.assertThat
+
+
+class TestDatabaseRecordLookupService {
+
+    private TestRunner runner
+
+    private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+    private final static String DB_LOCATION = "target/db"
+
+    @BeforeClass
+    static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log")
+    }
+
+    @Before
+    void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl()
+        final Map<String, String> dbcpProperties = new HashMap<>()
+
+        runner = TestRunners.newTestRunner(TestProcessor.class)
+        runner.addControllerService("dbcp", dbcp, dbcpProperties)
+        runner.enableControllerService(dbcp)
+    }
+
+    @Test
+    void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+        assertEquals("Hello", property1.get().getAsString("VAL2"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+        assertEquals("World", property2.get().getAsString("VAL2"))
+
+        // Key not found
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+    }
+
+    @Test
+    void testDatabaseLookupServiceSpecifyColumns() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, "val1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull("Should be null but is not", property1.get().getAsInt("VAL1"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+
+        // Key not found
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+    }
+
+    @Test
+    void exerciseCacheLogic() {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, DatabaseRecordLookupService.CACHE_SIZE, "10")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property1.get().getAsInt("VAL1"))
+        assertEquals("World", property1.get().getAsString("VAL2"))
+
+        final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals(1, property2.get().getAsInt("VAL1"))
+        assertEquals("World", property2.get().getAsString("VAL2"))
+
+        final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull(property3.get().getAsInt("VAL1"))
+        assertEquals("Hello", property3.get().getAsString("VAL2"))
+
+        final Optional<Record> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertNull(property4.get().getAsInt("VAL1"))
+        assertEquals("Hello", property4.get().getAsString("VAL2"))
+    }
+
+    /**
+     * Simple implementation for component testing.
+     *
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        String getIdentifier() {
+            "dbcp"
+        }
+
+        @Override
+        Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+                DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+            } catch (e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
new file mode 100644
index 0000000..e255771
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.db
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.dbcp.DBCPService
+import org.apache.nifi.lookup.LookupFailureException
+import org.apache.nifi.lookup.LookupService
+import org.apache.nifi.lookup.TestProcessor
+import org.apache.nifi.processor.exception.ProcessException
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+import java.sql.Statement
+
+import static org.hamcrest.CoreMatchers.instanceOf
+import static org.junit.Assert.*
+
+class TestSimpleDatabaseLookupService {
+
+    private TestRunner runner
+
+    private final static Optional<Record> EMPTY_RECORD = Optional.empty()
+    private final static String DB_LOCATION = "target/db"
+
+    @BeforeClass
+    static void setupClass() {
+        System.setProperty("derby.stream.error.file", "target/derby.log")
+    }
+
+    @Before
+    void setup() throws InitializationException {
+        final DBCPService dbcp = new DBCPServiceSimpleImpl()
+        final Map<String, String> dbcpProperties = new HashMap<>()
+
+        runner = TestRunners.newTestRunner(TestProcessor.class)
+        runner.addControllerService("dbcp", dbcp, dbcpProperties)
+        runner.enableControllerService(dbcp)
+    }
+
+    @Test
+    void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        // Lookup VAL1
+        final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertFalse(property1.isPresent())
+        // Key not found
+        final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
+        assertEquals(EMPTY_RECORD, property3)
+
+        runner.disableControllerService(service)
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+        runner.enableControllerService(service)
+        final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("World", property2.get())
+    }
+
+    @Test
+    void exerciseCacheLogic() {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION)
+        dbLocation.delete()
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
+        final Statement stmt = con.createStatement()
+
+        try {
+            stmt.execute("drop table TEST")
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
+        stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
+
+        final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
+
+        runner.addControllerService("db-lookup-service", service)
+        runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
+        runner.assertNotValid()
+        runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
+        runner.setProperty(service, SimpleDatabaseLookupService.CACHE_SIZE, "10")
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
+        runner.enableControllerService(service)
+        runner.assertValid(service)
+
+        def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
+
+        assertThat(lookupService, instanceOf(LookupService.class))
+
+        // Lookup VAL1
+        final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("1", property1.get())
+        final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertFalse(property3.isPresent())
+
+
+        runner.disableControllerService(service)
+        runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
+        runner.enableControllerService(service)
+        final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
+        assertEquals("World", property2.get())
+
+        final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
+        assertEquals("Hello", property4.get())
+    }
+
+    /**
+     * Simple implementation for component testing.
+     *
+     */
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        String getIdentifier() {
+            "dbcp"
+        }
+
+        @Override
+        Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+                DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
+            } catch (e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file