You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/04/01 11:41:15 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1450 - Revive SQL processors

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 02b0fb5  MINIFICPP-1450 - Revive SQL processors
02b0fb5 is described below

commit 02b0fb545cdf19948f7c456fc8d643e75965d9f6
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Thu Apr 1 13:40:18 2021 +0200

    MINIFICPP-1450 - Revive SQL processors
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1004
---
 .github/workflows/ci.yml                           |  36 +-
 CMakeLists.txt                                     |  14 +-
 PROCESSORS.md                                      |  49 ++-
 README.md                                          |   3 +-
 bootstrap.sh                                       |  13 +-
 bstrp_functions.sh                                 |  38 +-
 cmake/DockerConfig.cmake                           |   1 -
 docker/DockerBuild.sh                              |   2 -
 docker/Dockerfile                                  |   3 +-
 .../expression-language/ProcessContextExpr.cpp     |   4 +-
 extensions/sql/CMakeLists.txt                      |  49 ++-
 extensions/sql/SQLLoader.h                         |   3 +-
 extensions/sql/cmake/FindODBC.cmake                |  34 --
 extensions/sql/data/DatabaseConnectors.h           |   9 +-
 extensions/sql/data/JSONSQLWriter.cpp              |  59 +--
 extensions/sql/data/JSONSQLWriter.h                |  17 +-
 extensions/sql/data/MaxCollector.h                 | 111 ++---
 extensions/sql/data/SQLRowSubscriber.h             |   6 +-
 extensions/sql/data/SQLRowsetProcessor.cpp         |  36 +-
 extensions/sql/data/SQLRowsetProcessor.h           |   9 +-
 extensions/sql/data/SQLWriter.h                    |   1 -
 extensions/sql/data/Utils.cpp                      |  32 +-
 extensions/sql/data/Utils.h                        |   2 -
 extensions/sql/patch/soci.patch                    |  18 -
 extensions/sql/processors/ExecuteSQL.cpp           | 118 ++---
 extensions/sql/processors/ExecuteSQL.h             |  41 +-
 extensions/sql/processors/FlowFileSource.cpp       |  76 ++++
 extensions/sql/processors/FlowFileSource.h         | 103 +++++
 extensions/sql/processors/OutputFormat.cpp         |  57 ---
 extensions/sql/processors/OutputFormat.h           |  52 ---
 extensions/sql/processors/PutSQL.cpp               |  82 ++--
 extensions/sql/processors/PutSQL.h                 |  34 +-
 extensions/sql/processors/QueryDatabaseTable.cpp   | 480 ++++++++-------------
 extensions/sql/processors/QueryDatabaseTable.h     |  77 ++--
 extensions/sql/processors/SQLProcessor.cpp         |  95 ++++
 extensions/sql/processors/SQLProcessor.h           |  75 +---
 extensions/sql/services/ODBCConnector.h            |  15 +-
 extensions/sqlite/CMakeLists.txt                   |  33 --
 extensions/sqlite/ExecuteSQL.cpp                   | 190 --------
 extensions/sqlite/ExecuteSQL.h                     |  86 ----
 extensions/sqlite/PutSQL.cpp                       | 190 --------
 extensions/sqlite/PutSQL.h                         |  88 ----
 extensions/sqlite/SQLiteConnection.h               | 270 ------------
 .../tests/CWELCustomProviderTests.cpp              |   1 +
 extensions/windows-event-log/tests/CWELTestUtils.h |  28 --
 .../tests/ConsumeWindowsEventLogTests.cpp          |   1 +
 libminifi/include/utils/StringUtils.h              |   3 +-
 libminifi/src/utils/StringUtils.cpp                |   5 +
 .../data/WriteCallback.h => libminifi/test/Path.h  |  51 ++-
 libminifi/test/TestBase.cpp                        |  31 +-
 libminifi/test/TestBase.h                          |  23 +-
 libminifi/test/Utils.h                             |  34 ++
 libminifi/test/sql-tests/CMakeLists.txt            |  42 ++
 libminifi/test/sql-tests/ExecuteSQLTests.cpp       | 196 +++++++++
 libminifi/test/sql-tests/FlowFileMatcher.h         |  74 ++++
 libminifi/test/sql-tests/PutSQLTests.cpp           |  78 ++++
 .../test/sql-tests/QueryDatabaseTableTests.cpp     | 251 +++++++++++
 libminifi/test/sql-tests/SQLTestController.h       | 111 +++++
 libminifi/test/sql-tests/SQLTestPlan.h             |  90 ++++
 libminifi/test/sqlite-tests/CMakeLists.txt         |  39 --
 libminifi/test/sqlite-tests/SQLiteTests.cpp        | 472 --------------------
 61 files changed, 1865 insertions(+), 2376 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index dee602d..c23c19e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,7 +20,7 @@ jobs:
             macos-xcode11.2.1-ccache-${{github.ref}}-
             macos-xcode11.2.1-ccache-refs/heads/main-
       - id: install_dependencies
-        run: brew install ossp-uuid boost flex openssl python lua@5.3 xz libssh2 ccache
+        run: brew install ossp-uuid boost flex openssl python lua@5.3 xz libssh2 ccache sqliteodbc
       - id: setup_env
         run: |
           echo "PATH=/usr/lib/ccache:/usr/local/opt/ccache/bin:/usr/local/opt/ccache/libexec:$PATH" >> $GITHUB_ENV
@@ -30,7 +30,7 @@ jobs:
         run: |
           export PATH="/usr/local/opt/lua@5.3/lib:/usr/local/opt/lua@5.3/include:/usr/local/opt/lua@5.3/bin:$PATH"
           export PKG_CONFIG_PATH="/usr/local/opt/lua@5.3/lib/pkgconfig"
-          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DENABLE_AWS=ON -DENABLE_AZURE=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
+          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
   macos_xcode_12_0:
     name: "macos-xcode12.0"
     runs-on: macos-10.15
@@ -50,7 +50,7 @@ jobs:
             macos-xcode12.0-ccache-${{github.ref}}-
             macos-xcode12.0-ccache-refs/heads/main-
       - id: install_dependencies
-        run: brew install ossp-uuid boost flex openssl python lua@5.3 xz libssh2 ccache
+        run: brew install ossp-uuid boost flex openssl python lua@5.3 xz libssh2 ccache sqliteodbc
       - id: setup_env
         run: |
           echo "PATH=/usr/lib/ccache:/usr/local/opt/ccache/bin:/usr/local/opt/ccache/libexec:$PATH" >> $GITHUB_ENV
@@ -60,7 +60,7 @@ jobs:
         run: |
           export PATH="/usr/local/opt/lua@5.3/lib:/usr/local/opt/lua@5.3/include:/usr/local/opt/lua@5.3/bin:$PATH"
           export PKG_CONFIG_PATH="/usr/local/opt/lua@5.3/lib/pkgconfig"
-          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
+          ./bootstrap.sh -e -t && cd build  && cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_LUA_SCRIPTING=1 -DENABLE_SQL=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j4 --output-on-failure" && make linter
   windows_VS2017:
     name: "windows-vs2017"
     runs-on: windows-2016
@@ -70,6 +70,11 @@ jobs:
         uses: actions/checkout@v2
       - name: Setup PATH
         uses: microsoft/setup-msbuild@v1.0.2
+      - id: install-sqliteodbc-driver
+        run: |
+          Invoke-WebRequest -Uri "http://www.ch-werner.de/sqliteodbc/sqliteodbc.exe" -OutFile "sqliteodbc.exe"
+          ./sqliteodbc.exe /S
+        shell: powershell
       - id: build
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.17763.0\x86
@@ -85,11 +90,16 @@ jobs:
         uses: actions/checkout@v2
       - name: Setup PATH
         uses: microsoft/setup-msbuild@v1.0.2
+      - id: install-sqliteodbc-driver
+        run: |
+          Invoke-WebRequest -Uri "http://www.ch-werner.de/sqliteodbc/sqliteodbc_w64.exe" -OutFile "sqliteodbc_w64.exe"
+          ./sqliteodbc_w64.exe /S
+        shell: powershell
       - id: build
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
-          win_build_vs.bat build /2019 /64 /CI
+          win_build_vs.bat build /2019 /64 /CI /S /A
         shell: cmd
   ubuntu_16_04:
     name: "ubuntu-16.04"
@@ -132,12 +142,13 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev
+          sudo apt install -y gcc-4.8 g++-4.8 bison flex uuid-dev openssl libcurl4-openssl-dev ccache libpython3-dev liblua5.1-0-dev libssh2-1-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so
           echo "PATH=/usr/lib/ccache:$PATH" >> $GITHUB_ENV
           sudo unlink /usr/bin/gcc && sudo ln -s /usr/bin/gcc-4.8 /usr/bin/gcc
           sudo unlink /usr/bin/g++ && sudo ln -s /usr/bin/g++-4.8 /usr/bin/g++
       - id: build
-        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j2 --output-on-failure"
+        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_SQL=ON .. && cmake --build . --parallel 4 && make test ARGS="--timeout 300 -j2 --output-on-failure"
   ubuntu_20_04:
     name: "ubuntu-20.04"
     runs-on: ubuntu-20.04
@@ -156,10 +167,11 @@ jobs:
       - id: install_deps
         run: |
           sudo apt update
-          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev
+          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so
           echo "PATH=/usr/lib/ccache:$PATH" >> $GITHUB_ENV
       - id: build
-        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DENABLE_BUSTACHE=ON -DENABLE_SQLITE=ON -DENABLE_PCAP=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && make -j4 VERBOSE=1  && make test ARGS="--timeout 300 -j2 --output-on-failure"
+        run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DENABLE_BUSTACHE=ON -DENABLE_SQL=ON -DENABLE_PCAP=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && make -j4 VERBOSE=1  && make test ARGS="--timeout 300 -j2 --output-on-failure"
   ubuntu_20_04_all_clang:
     name: "ubuntu-20.04-all-clang"
     runs-on: ubuntu-20.04
@@ -178,7 +190,8 @@ jobs:
       - id: install_deps
         run: |
           sudo apt update
-          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev openjdk-8-jdk maven libusb-1.0-0-dev libpng-dev libgps-dev
+          sudo apt install -y ccache libfl-dev libpcap-dev libboost-all-dev openjdk-8-jdk maven libusb-1.0-0-dev libpng-dev libgps-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so
           echo "PATH=/usr/lib/ccache:$PATH" >> $GITHUB_ENV
       - id: build
         run: ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DCMAKE_BUILD_TYPE=Release -DENABLE_JNI=ON -DENABLE_SENSORS=ON -DENABLE_OPENWSMAN=ON -DENABLE_OPENCV=ON -DENABLE_MQTT=ON -DENABLE_GPS=ON -DENABLE_USB_CAMERA=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_OPC=ON -DENABLE_SFTP=ON -DENABLE_MQTT=ON -DENABLE_COAP=ON -DENABLE_PYTHON=ON -DENABLE_SQL=ON -DENABLE_AWS=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ .. &&  cmake - [...]
@@ -201,7 +214,8 @@ jobs:
         run: |
           sudo apt-add-repository -y "ppa:ubuntu-toolchain-r/test"
           sudo apt update
-          sudo apt install -y ccache openjdk-8-jdk maven libusb-1.0-0-dev libpng12-dev libgps-dev
+          sudo apt install -y ccache openjdk-8-jdk maven libusb-1.0-0-dev libpng12-dev libgps-dev libsqliteodbc
+          sudo ln -s /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so /usr/lib/x86_64-linux-gnu/libsqlite3odbc.so
           echo "PATH=/usr/lib/ccache:$PATH" >> $GITHUB_ENV
       - id: build
         run: sudo mount tmpfs -t tmpfs /tmp && ./bootstrap.sh -e -t && cd build  && cmake -DUSE_SHARED_LIBS= -DENABLE_OPENWSMAN=ON -DENABLE_OPENCV=ON -DENABLE_MQTT=ON -DENABLE_GPS=ON -DENABLE_USB_CAMERA=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_OPC=ON -DENABLE_SFTP=ON -DENABLE_MQTT=ON -DENABLE_COAP=ON -DENABLE_PYTHON=ON -DENABLE_SQL=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. &&  cmake --build . --parallel 4  && make test ARGS="--timeout 300 -j8 --out [...]
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 17f94f2..d9efff2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -431,7 +431,7 @@ endif(WIN32)
 
 option(ENABLE_SQL "Enables the SQL Suite of Tools." OFF)
 if (ENABLE_ALL OR ENABLE_SQL)
-	createExtension(SQL-EXTENSIONS "SQL EXTENSIONS" "Enables the SQL Suite of Tools" "extensions/sql")
+	createExtension(SQL-EXTENSIONS "SQL EXTENSIONS" "Enables the SQL Suite of Tools" "extensions/sql" "${TEST_DIR}/sql-tests")
 endif()
 
 ## Create MQTT Extension
@@ -478,18 +478,6 @@ if(ENABLE_ALL OR ENABLE_SENSORS)
 	createExtension(SENSOR-EXTENSIONS "SENSOR EXTENSIONS" "Enables the package of sensor extensions." "extensions/sensors" "${TEST_DIR}/sensors-tests")
 endif()
 
-## SQLite extensions
-option(ENABLE_SQLITE "Disables the scripting extensions." OFF)
-if (ENABLE_SQLITE)
-	include(BundledSQLite)
-	use_bundled_sqlite(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
-	createExtension(SQLITE-EXTENSIONS "SQLITE EXTENSIONS" "This enables sqlite" "extensions/sqlite" "${TEST_DIR}/sqlite-tests")
-endif()
-
-if (ENABLE_SQL AND ENABLE_SQLITE)
-	message(FATAL_ERROR "ENABLE_SQL and ENABLE_SQLITE are incompatible. Set only one at a time.")
-endif()
-
 ## USB camera extensions
 option(ENABLE_USB_CAMERA "Enables USB camera support." OFF)
 if (ENABLE_ALL OR ENABLE_USB_CAMERA)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index af3b1e2..0306c42 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -44,6 +44,7 @@
 - [PutS3Object](#puts3object)
 - [PutSFTP](#putsftp)
 - [PutSQL](#putsql)
+- [QueryDatabaseTable](#querydatabasetable)
 - [RetryFlowFile](#retryflowfile)
 - [RouteOnAttribute](#routeonattribute)
 - [TailFile](#tailfile)
@@ -333,15 +334,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 | Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
-|Connection URL|||The database URL to connect to|
-|SQL Statement|||The SQL statement to execute|
+|**DB Controller Service**|||Database Controller Service.|
+|**Max Rows Per Flow File**|0||The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.|
+|**Output Format**|JSON-Pretty|JSON<br/>JSON-Pretty|Set the output format type.|
+|SQL select query|||The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database.|
 ### Relationships
 
 | Name | Description |
 | - | - |
-|failure|Failures which will not work if retried|
-|original|The original FlowFile is sent here|
-|success|After a successful SQL execution, result FlowFiles are sent here|
+|success|Successfully created FlowFile from SQL query result set.|
 
 
 ## ExecuteScript
@@ -1211,16 +1212,42 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 | Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
-|Batch Size|1||The maximum number of flow files to process in one batch|
-|Connection URL|||The database URL to connect to|
-|SQL Statement|||The SQL statement to execute|
+|**DB Controller Service**|||Database Controller Service.|
+|SQL Statement|||The SQL statement to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of the incoming flow file is expected to contain a valid SQL statement, to be issued by the processor to the database.|
 ### Relationships
 
 | Name | Description |
 | - | - |
-|failure|Failures which will not work if retried|
-|retry|Failures which might work if retried|
-|success|After a successful put SQL operation, FlowFiles are sent here|
+|success|After a successful SQL update operation, the incoming FlowFile sent here|
+
+## QueryDatabaseTable
+
+### Description
+
+Fetches all rows of a table, whose values in the specified Maximum-value Columns are larger than the previously-seen maxima. If that property is not provided, all rows are returned. The rows are grouped according to the value of Max Rows Per Flow File property and formatted as JSON.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Columns to Return|||A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. If no column names are supplied, all columns in the specified table will be returned. NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.|
+|**DB Controller Service**|||Database Controller Service.|
+|**Max Rows Per Flow File**|0||The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.|
+|Maximum-value Columns|||A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables.|
+|**Output Format**|JSON-Pretty|JSON<br/>JSON-Pretty|Set the output format type.|
+|**Table Name**|||The name of the database table to be queried.|
+|Where Clause|||A custom clause to be added in the WHERE condition when building SQL queries.|
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Successfully created FlowFile from SQL query result set.|
+### Dynamic Properties:
+
+| Name | Value | Description |
+| - | - | - |
+|initial.maxvalue.<max_value_column>|Initial maximum value for the specified column|Specifies an initial max value for max value column(s). Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).<br/>**Supports Expression Language: true**|
 
 
 ## RetryFlowFile
diff --git a/README.md b/README.md
index 8b8f1d6..4d536ef 100644
--- a/README.md
+++ b/README.md
@@ -89,8 +89,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 | Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br/>**Custom Python Processors**     |    -DDISABLE_SCRIPTING=ON  |
 | Sensors | GetEnvironmentalSensors<br/>GetMovementSensors | -DENABLE_SENSORS=ON |
 | SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp) | -DENABLE_SFTP=ON |
-| SQL | ExecuteSQL<br/>PutSQL<br/>QueryDatabaseTable<br/> | -DENABLE_SQL=ON  |
-| SQLite | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)      |    -DENABLE_SQLITE=ON  |
+| SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> | -DENABLE_SQL=ON  |
 | Tensorflow | TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/>      |    -DENABLE_TENSORFLOW=ON  |
 | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera)     |    -DENABLE_USB_CAMERA=ON  |
 | Windows Event Log (Windows only) | CollectorInitiatedSubscription<br/>ConsumeWindowsEventLog<br/>TailEventLog | -DENABLE_WEL=ON |
diff --git a/bootstrap.sh b/bootstrap.sh
index 2ece4cc..8a67035 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -304,10 +304,7 @@ add_disabled_option OPENCV_ENABLED ${FALSE} "ENABLE_OPENCV"
 add_disabled_option SFTP_ENABLED ${FALSE} "ENABLE_SFTP"
 add_dependency SFTP_ENABLED "libssh2"
 
-add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
-
 add_disabled_option SQL_ENABLED ${FALSE} "ENABLE_SQL"
-set_incompatible_with SQL_ENABLED SQLITE_ENABLED
 
 add_disabled_option OPENWSMAN_ENABLED ${FALSE} "ENABLE_OPENWSMAN"
 
@@ -325,9 +322,9 @@ add_dependency OPC_ENABLED "mbedtls"
 add_disabled_option AZURE_ENABLED ${FALSE} "ENABLE_AZURE"
 
 USE_SHARED_LIBS=${TRUE}
-TESTS_DISABLED=${FALSE}
 ASAN_ENABLED=${FALSE}
 FAIL_ON_WARNINGS=${FALSE}
+TESTS_ENABLED=${TRUE}
 
 ## name, default, values
 add_multi_option BUILD_PROFILE "RelWithDebInfo" "RelWithDebInfo" "Debug" "MinSizeRel" "Release"
@@ -343,7 +340,7 @@ OVERRIDE_BUILD_IDENTIFIER=${BUILD_IDENTIFIER}
 load_state
 
 if [ "$USER_DISABLE_TESTS" == "${TRUE}" ]; then
-   ToggleFeature TESTS_DISABLED
+   ToggleFeature TESTS_ENABLED
 fi
 
 
@@ -457,11 +454,11 @@ build_cmake_command(){
     CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DCMAKE_BUILD_TYPE=RelWithDebInfo"
   fi
 
-  if [ "${TESTS_DISABLED}" = "${TRUE}" ]; then
-    CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DSKIP_TESTS=true "
-  else
+  if [ "${TESTS_ENABLED}" = "${TRUE}" ]; then
     # user may have disabled tests previously, so let's force them to be re-enabled
     CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DSKIP_TESTS= "
+  else
+    CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DSKIP_TESTS=true "
   fi
 
   if [ "${ASAN_ENABLED}" = "${TRUE}" ]; then
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index f4648ed..29c8cb5 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -166,7 +166,7 @@ save_state(){
   echo "VERSION=1" > ${script_directory}/bt_state
   echo_state_variable BUILD_IDENTIFIER
   echo_state_variable BUILD_DIR
-  echo_state_variable TESTS_DISABLED
+  echo_state_variable TESTS_ENABLED
   echo_state_variable BUILD_PROFILE
   echo_state_variable USE_SHARED_LIBS
   echo_state_variable ASAN_ENABLED
@@ -358,20 +358,19 @@ show_supported_features() {
   echo "J. TensorFlow Support ..........$(print_feature_status TENSORFLOW_ENABLED)"
   echo "K. Bustache Support ............$(print_feature_status BUSTACHE_ENABLED)"
   echo "L. MQTT Support ................$(print_feature_status MQTT_ENABLED)"
-  echo "M. SQLite Support ..............$(print_feature_status SQLITE_ENABLED)"
-  echo "N. Python Support ..............$(print_feature_status PYTHON_ENABLED)"
-  echo "O. COAP Support ................$(print_feature_status COAP_ENABLED)"
-  echo "S. SFTP Support ................$(print_feature_status SFTP_ENABLED)"
-  echo "V. AWS Support .................$(print_feature_status AWS_ENABLED)"
+  echo "M. Python Support ..............$(print_feature_status PYTHON_ENABLED)"
+  echo "N. COAP Support ................$(print_feature_status COAP_ENABLED)"
+  echo "O. SFTP Support ................$(print_feature_status SFTP_ENABLED)"
+  echo "S. AWS Support .................$(print_feature_status AWS_ENABLED)"
   echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
   echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
-  echo "W. SQL Support..................$(print_feature_status SQL_ENABLED)"
-  echo "X. Openwsman Support ...........$(print_feature_status OPENWSMAN_ENABLED)"
-  echo "Y. Azure Support ...............$(print_feature_status AZURE_ENABLED)"
+  echo "V. SQL Support..................$(print_feature_status SQL_ENABLED)"
+  echo "W. Openwsman Support ...........$(print_feature_status OPENWSMAN_ENABLED)"
+  echo "X. Azure Support ...............$(print_feature_status AZURE_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
-  echo "1. Disable Tests ...............$(print_feature_status TESTS_DISABLED)"
+  echo "1. Enable Tests ................$(print_feature_status TESTS_ENABLED)"
   echo "2. Enable all extensions"
   echo "3. Enable JNI Support ..........$(print_feature_status JNI_ENABLED)"
   echo "4. Use Shared Dependency Links .$(print_feature_status USE_SHARED_LIBS)"
@@ -405,22 +404,21 @@ read_feature_options(){
     j) ToggleFeature TENSORFLOW_ENABLED ;;
     k) ToggleFeature BUSTACHE_ENABLED ;;
     l) ToggleFeature MQTT_ENABLED ;;
-    m) ToggleFeature SQLITE_ENABLED ;;
-    v) ToggleFeature AWS_ENABLED ;;
-    n) if [ "$USE_SHARED_LIBS" = "${TRUE}" ]; then
+    m) if [ "$USE_SHARED_LIBS" = "${TRUE}" ]; then
          ToggleFeature PYTHON_ENABLED
        else
          echo -e "${RED}Please ensure static linking is enabled for Python Support...${NO_COLOR}" && sleep 2
    	   fi
    	   ;;
-    o) ToggleFeature COAP_ENABLED ;;
-	s) ToggleFeature SFTP_ENABLED ;;
+    n) ToggleFeature COAP_ENABLED ;;
+    o) ToggleFeature SFTP_ENABLED ;;
+    s) ToggleFeature AWS_ENABLED ;;
     t) ToggleFeature OPENCV_ENABLED ;;
     u) ToggleFeature OPC_ENABLED ;;
-    w) ToggleFeature SQL_ENABLED ;;
-    x) ToggleFeature OPENWSMAN_ENABLED ;;
-    y) ToggleFeature AZURE_ENABLED ;;
-    1) ToggleFeature TESTS_DISABLED ;;
+    v) ToggleFeature SQL_ENABLED ;;
+    w) ToggleFeature OPENWSMAN_ENABLED ;;
+    x) ToggleFeature AZURE_ENABLED ;;
+    1) ToggleFeature TESTS_ENABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
     4) if [ "$PYTHON_ENABLED" = "${FALSE}" ]; then
@@ -438,7 +436,7 @@ read_feature_options(){
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-Y or 1-6...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-X or 1-7...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 308e2be..c930600 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -37,7 +37,6 @@ add_custom_target(
         -c ENABLE_PCAP=${ENABLE_PCAP}
         -c ENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA}
         -c ENABLE_SENSORS=${ENABLE_SENSORS}
-        -c ENABLE_SQLITE=${ENABLE_SQLITE}
         -c ENABLE_USB_CAMERA=${ENABLE_USB_CAMERA}
         -c ENABLE_TENSORFLOW=${ENABLE_TENSORFLOW}
         -c ENABLE_AWS=${ENABLE_AWS}
diff --git a/docker/DockerBuild.sh b/docker/DockerBuild.sh
index a30fca7..da56226 100755
--- a/docker/DockerBuild.sh
+++ b/docker/DockerBuild.sh
@@ -42,7 +42,6 @@ ENABLE_MQTT=${ENABLE_MQTT:-}
 ENABLE_PCAP=${ENABLE_PCAP:-}
 ENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA:-}
 ENABLE_SENSORS=${ENABLE_SENSORS:-}
-ENABLE_SQLITE=${ENABLE_SQLITE:-}
 ENABLE_USB_CAMERA=${ENABLE_USB_CAMERA:-}
 ENABLE_TENSORFLOW=${ENABLE_TENSORFLOW:-}
 ENABLE_AWS=${ENABLE_AWS:-}
@@ -180,7 +179,6 @@ BUILD_ARGS="--build-arg UID=${UID_ARG} \
             --build-arg ENABLE_PCAP=${ENABLE_PCAP} \
             --build-arg ENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA} \
             --build-arg ENABLE_SENSORS=${ENABLE_SENSORS} \
-            --build-arg ENABLE_SQLITE=${ENABLE_SQLITE} \
             --build-arg ENABLE_USB_CAMERA=${ENABLE_USB_CAMERA} \
             --build-arg ENABLE_TENSORFLOW=${ENABLE_TENSORFLOW} \
             --build-arg ENABLE_AWS=${ENABLE_AWS} \
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 37541b4..31f6197 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -99,7 +99,6 @@ ARG ENABLE_MQTT
 ARG ENABLE_PCAP
 ARG ENABLE_LIBRDKAFKA
 ARG ENABLE_SENSORS
-ARG ENABLE_SQLITE
 ARG ENABLE_USB_CAMERA
 ARG ENABLE_TENSORFLOW
 ARG ENABLE_AWS
@@ -124,7 +123,7 @@ RUN cd ${MINIFI_BASE_DIR} \
   && cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL=${ENABLE_ALL} -DENABLE_PYTHON=${ENABLE_PYTHON} -DENABLE_OPS=${ENABLE_OPS} \
     -DENABLE_JNI=${ENABLE_JNI} -DENABLE_OPENCV=${ENABLE_OPENCV} -DENABLE_OPC=${ENABLE_OPC} -DENABLE_GPS=${ENABLE_GPS} -DENABLE_COAP=${ENABLE_COAP} \
     -DENABLE_WEL=${ENABLE_WEL} -DENABLE_SQL=${ENABLE_SQL} -DENABLE_MQTT=${ENABLE_MQTT} -DENABLE_PCAP=${ENABLE_PCAP} \
-    -DENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA} -DENABLE_SENSORS=${ENABLE_SENSORS} -DENABLE_SQLITE=${ENABLE_SQLITE} \
+    -DENABLE_LIBRDKAFKA=${ENABLE_LIBRDKAFKA} -DENABLE_SENSORS=${ENABLE_SENSORS} \
     -DENABLE_USB_CAMERA=${ENABLE_USB_CAMERA} -DENABLE_TENSORFLOW=${ENABLE_TENSORFLOW} -DENABLE_AWS=${ENABLE_AWS} \
     -DENABLE_BUSTACHE=${ENABLE_BUSTACHE} -DENABLE_SFTP=${ENABLE_SFTP} -DENABLE_OPENWSMAN=${ENABLE_OPENWSMAN} -DENABLE_AZURE=${ENABLE_AZURE} \
     -DDISABLE_CURL=${DISABLE_CURL} -DDISABLE_JEMALLOC=${DISABLE_JEMALLOC} -DDISABLE_CIVET=${DISABLE_CIVET} \
diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp
index 2882302..cd69265 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -30,7 +30,9 @@ bool ProcessContextExpr::getProperty(const Property &property, std::string &valu
   auto name = property.getName();
   if (expressions_.find(name) == expressions_.end()) {
     std::string expression_str;
-    ProcessContext::getProperty(name, expression_str);
+    if (!ProcessContext::getProperty(name, expression_str)) {
+      return false;
+    }
     logger_->log_debug("Compiling expression for %s/%s: %s", getProcessorNode()->getName(), name, expression_str);
     expressions_.emplace(name, expression::compile(expression_str));
   }
diff --git a/extensions/sql/CMakeLists.txt b/extensions/sql/CMakeLists.txt
index c649376..e5e05de 100644
--- a/extensions/sql/CMakeLists.txt
+++ b/extensions/sql/CMakeLists.txt
@@ -43,16 +43,15 @@ else()
 	# Build project
 	ExternalProject_Add(
 			iodbc-external
-			URL "https://github.com/openlink/iODBC/archive/v3.52.13.tar.gz"
-			URL_HASH "SHA256=4bf67fc6d4d237a4db19b292b5dd255ee09a0b2daa4e4058cf3a918bc5102135"
+			URL "https://github.com/openlink/iODBC/archive/v3.52.14.tar.gz"
+			URL_HASH "SHA256=896d7e16b283cf9a6f5b5f46e8e9549aef21a11935726b0170987cd4c59d16db"
 			BUILD_IN_SOURCE true
 			SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/iodbc-src"
 			BUILD_COMMAND make
 			CMAKE_COMMAND ""
 			UPDATE_COMMAND ""
 			INSTALL_COMMAND make install
-			CONFIGURE_COMMAND ""
-			PATCH_COMMAND ./autogen.sh && ./configure --prefix=${IODBC_BYPRODUCT_DIR}
+			CONFIGURE_COMMAND ./autogen.sh && ./configure --prefix=${IODBC_BYPRODUCT_DIR}
 			STEP_TARGETS build
 			BUILD_BYPRODUCTS "${IODBC_BYPRODUCT_DIR}/${IODBC_BYPRODUCT}"
 			EXCLUDE_FROM_ALL TRUE
@@ -77,12 +76,6 @@ endif()
 
 # Build SOCI
 
-# Find patch executable
-find_package(Patch)
-
-# Define patch step
-set(PC "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_CURRENT_SOURCE_DIR}/patch/soci.patch")
-
 # Define byproducts
 # This should be based on GNUInstallDirs, but it's done wrong in Soci:
 # https://github.com/SOCI/soci/blob/release/4.0/CMakeLists.txt#L140
@@ -109,15 +102,28 @@ foreach(SOCI_BYPRODUCT ${SOCI_BYPRODUCTS})
 	list(APPEND SOCI_LIBRARIES_LIST "${SOCI_BYPRODUCT_DIR}/${SOCI_BYPRODUCT}")
 endforeach(SOCI_BYPRODUCT)
 
-# Set build options
-set(SOCI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
-		"-DCMAKE_INSTALL_PREFIX=${SOCI_BYPRODUCT_DIR}"
-		"-DSOCI_TESTS=OFF"
-		"-DSOCI_SHARED=OFF"
-		"-DSOCI_CXX_C11=ON"
-		"-DWITH_ODBC=ON"
-		"-DSOCI_ODBC=ON"
-		"-DWITH_BOOST=OFF")
+if(WIN32)
+	# Set build options
+	set(SOCI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+			"-DCMAKE_INSTALL_PREFIX=${SOCI_BYPRODUCT_DIR}"
+			"-DSOCI_TESTS=OFF"
+			"-DSOCI_SHARED=OFF"
+			"-DSOCI_CXX_C11=ON"
+			"-DWITH_ODBC=ON"
+			"-DWITH_BOOST=OFF")
+else()
+	# SOCI has its own FindODBC.cmake file
+	# Set build options
+	set(SOCI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+			"-DCMAKE_INSTALL_PREFIX=${SOCI_BYPRODUCT_DIR}"
+			"-DSOCI_TESTS=OFF"
+			"-DSOCI_SHARED=OFF"
+			"-DSOCI_CXX_C11=ON"
+			"-DSOCI_ODBC=ON"
+			"-DODBC_INCLUDE_DIR=${IODBC_INCLUDE_DIRS}"
+			"-DODBC_LIBRARY=${IODBC_LIBRARIES}"
+			"-DWITH_BOOST=OFF")
+endif()
 
 if(NOT WIN32)
 	list(APPEND SOCI_CMAKE_ARGS "-DCMAKE_MODULE_PATH=${CMAKE_CURRENT_SOURCE_DIR}/cmake/"
@@ -128,9 +134,8 @@ endif()
 # Build project
 ExternalProject_Add(
 		soci-external
-		URL "https://github.com/SOCI/soci/archive/4.0.0.tar.gz"
-		URL_HASH "SHA256=359b988d8cbe81357835317821919f7e270c0705e41951a92ac1627cb9fe8faf"
-		PATCH_COMMAND ${PC}
+		URL "https://github.com/SOCI/soci/archive/4.0.1.tar.gz"
+		URL_HASH "SHA256=fa69347b1a1ef74450c0382b665a67bd6777cc7005bbe09726479625bcf1e29c"
 		SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/soci-src"
 		CMAKE_ARGS ${SOCI_CMAKE_ARGS}
 		BUILD_BYPRODUCTS ${SOCI_LIBRARIES_LIST}
diff --git a/extensions/sql/SQLLoader.h b/extensions/sql/SQLLoader.h
index b962777..844db5d 100644
--- a/extensions/sql/SQLLoader.h
+++ b/extensions/sql/SQLLoader.h
@@ -22,6 +22,7 @@
 #include "processors/PutSQL.h"
 #include "processors/QueryDatabaseTable.h"
 #include "services/ODBCConnector.h"
+#include "utils/GeneralUtils.h"
 
 class SQLFactory : public core::ObjectFactory {
  public:
@@ -48,7 +49,7 @@ class SQLFactory : public core::ObjectFactory {
 
   template <typename T>
   static std::unique_ptr<ObjectFactory> getObjectFactory() {
-    return std::make_unique<core::DefautObjectFactory<T>>();
+    return utils::make_unique<core::DefautObjectFactory<T>>();
   }
 
   std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override {
diff --git a/extensions/sql/cmake/FindODBC.cmake b/extensions/sql/cmake/FindODBC.cmake
deleted file mode 100644
index b232e76..0000000
--- a/extensions/sql/cmake/FindODBC.cmake
+++ /dev/null
@@ -1,34 +0,0 @@
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-if (NOT ODBC_FOUND)
-    set(ODBC_FOUND "YES" CACHE STRING "" FORCE)
-    set(ODBC_INCLUDE_DIR "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
-    set(ODBC_INCLUDE_DIRS "${EXPORTED_IODBC_INCLUDE_DIRS}" CACHE STRING "" FORCE)
-    set(ODBC_LIBRARIES "${EXPORTED_IODBC_LIBRARIES}" CACHE STRING "" FORCE)
-    set(ODBC_LIBRARY "${EXPORTED_IODBC_LIBRARIES}"  CACHE STRING "" FORCE)
-endif()
-
-if(NOT TARGET ODBC::ODBC)
-    add_library(ODBC::ODBC STATIC IMPORTED)
-    set_target_properties(ODBC::ODBC PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${ODBC_INCLUDE_DIRS}")
-    set_target_properties(ODBC::ODBC PROPERTIES
-        IMPORTED_LINK_INTERFACE_LANGUAGES "C"
-        IMPORTED_LOCATION "${ODBC_LIBRARIES}")
-endif()
\ No newline at end of file
diff --git a/extensions/sql/data/DatabaseConnectors.h b/extensions/sql/data/DatabaseConnectors.h
index 3328739..e9e6437 100644
--- a/extensions/sql/data/DatabaseConnectors.h
+++ b/extensions/sql/data/DatabaseConnectors.h
@@ -45,8 +45,13 @@ class Statement {
 
   virtual ~Statement() = default;
 
-  soci::rowset<soci::row> execute() {
-    return session_.prepare << query_;
+  soci::rowset<soci::row> execute(const std::vector<std::string>& args = {}) {
+    auto stmt = session_.prepare << query_;
+    for (auto& arg : args) {
+      // binds arguments to the prepared statement
+      stmt.operator,(soci::use(arg));
+    }
+    return stmt;
   }
 
  protected:
diff --git a/extensions/sql/data/JSONSQLWriter.cpp b/extensions/sql/data/JSONSQLWriter.cpp
index 6c24e73..2d9fa2e 100644
--- a/extensions/sql/data/JSONSQLWriter.cpp
+++ b/extensions/sql/data/JSONSQLWriter.cpp
@@ -29,53 +29,62 @@ namespace nifi {
 namespace minifi {
 namespace sql {
 
-JSONSQLWriter::JSONSQLWriter(bool pretty)
-  : pretty_(pretty), jsonPayload_(rapidjson::kArrayType) {
+JSONSQLWriter::JSONSQLWriter(bool pretty, ColumnFilter column_filter)
+  : pretty_(pretty), current_batch_(rapidjson::kArrayType), column_filter_(std::move(column_filter)) {
 }
 
-JSONSQLWriter::~JSONSQLWriter() = default;
-
 void JSONSQLWriter::beginProcessRow() {
-  jsonRow_ = rapidjson::kObjectType;
+  current_row_ = rapidjson::kObjectType;
 }
 
 void JSONSQLWriter::endProcessRow() {
-  jsonPayload_.PushBack(jsonRow_, jsonPayload_.GetAllocator());
+  current_batch_.PushBack(current_row_, current_batch_.GetAllocator());
+}
+
+void JSONSQLWriter::beginProcessBatch() {
+  current_batch_ = rapidjson::Document(rapidjson::kArrayType);
 }
 
-void JSONSQLWriter::processColumnName(const std::string& /*name*/) {}
+void JSONSQLWriter::endProcessBatch() {}
+
+void JSONSQLWriter::finishProcessing() {}
+
+void JSONSQLWriter::processColumnNames(const std::vector<std::string>& /*name*/) {}
 
 void JSONSQLWriter::processColumn(const std::string& name, const std::string& value) {
   addToJSONRow(name, toJSONString(value));
 }
 
 void JSONSQLWriter::processColumn(const std::string& name, double value) {
-  addToJSONRow(name, std::move(rapidjson::Value().SetDouble(value)));
+  addToJSONRow(name, rapidjson::Value(value));
 }
 
 void JSONSQLWriter::processColumn(const std::string& name, int value) {
-  addToJSONRow(name, std::move(rapidjson::Value().SetInt(value)));
+  addToJSONRow(name, rapidjson::Value(value));
 }
 
 void JSONSQLWriter::processColumn(const std::string& name, long long value) {
-  addToJSONRow(name, std::move(rapidjson::Value().SetInt64(value)));
+  addToJSONRow(name, rapidjson::Value(gsl::narrow<int64_t>(value)));
 }
 
 void JSONSQLWriter::processColumn(const std::string& name, unsigned long long value) {
-  addToJSONRow(name, std::move(rapidjson::Value().SetUint64(value)));
+  addToJSONRow(name, rapidjson::Value(gsl::narrow<uint64_t>(value)));
 }
 
 void JSONSQLWriter::processColumn(const std::string& name, const char* value) {
   addToJSONRow(name, toJSONString(value));
 }
 
-void JSONSQLWriter::addToJSONRow(const std::string& columnName, rapidjson::Value&& jsonValue) {
-  jsonRow_.AddMember(toJSONString(columnName), std::move(jsonValue), jsonPayload_.GetAllocator());
+void JSONSQLWriter::addToJSONRow(const std::string& column_name, rapidjson::Value&& json_value) {
+  if (!column_filter_(column_name)) {
+    return;
+  }
+  current_row_.AddMember(toJSONString(column_name), std::move(json_value), current_batch_.GetAllocator());
 }
 
 rapidjson::Value JSONSQLWriter::toJSONString(const std::string& s) {
   rapidjson::Value jsonValue;
-  jsonValue.SetString(s.c_str(), s.size(), jsonPayload_.GetAllocator());
+  jsonValue.SetString(s.c_str(), s.size(), current_batch_.GetAllocator());
 
   return jsonValue;
 }
@@ -85,23 +94,17 @@ std::string JSONSQLWriter::toString() {
 
   if (pretty_) {
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
-    jsonPayload_.Accept(writer);
+    current_batch_.Accept(writer);
   } else {
     rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
-    jsonPayload_.Accept(writer);
+    current_batch_.Accept(writer);
   }
 
-  std::stringstream outputStream;
-  outputStream << buffer.GetString();
-
-  jsonPayload_ = rapidjson::Document(rapidjson::kArrayType);
-
-  return outputStream.str();
+  return {buffer.GetString(), buffer.GetSize()};
 }
 
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
+}  // namespace sql
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/data/JSONSQLWriter.h b/extensions/sql/data/JSONSQLWriter.h
index 6888527..f01f304 100644
--- a/extensions/sql/data/JSONSQLWriter.h
+++ b/extensions/sql/data/JSONSQLWriter.h
@@ -18,6 +18,8 @@
 
 #pragma once
 
+#include <functional>
+
 #include "rapidjson/document.h"
 
 #include "SQLWriter.h"
@@ -30,15 +32,19 @@ namespace sql {
 
 class JSONSQLWriter: public SQLWriter {
  public:
-  explicit JSONSQLWriter(bool pretty);
-  virtual ~JSONSQLWriter();
+  using ColumnFilter = std::function<bool(const std::string&)>;
+
+  explicit JSONSQLWriter(bool pretty, ColumnFilter column_filter = [] (const std::string&) {return true;});
 
   std::string toString() override;
 
 private:
+  void beginProcessBatch() override;
+  void endProcessBatch() override;
   void beginProcessRow() override;
   void endProcessRow() override;
-  void processColumnName(const std::string& name) override;
+  void finishProcessing() override;
+  void processColumnNames(const std::vector<std::string>& name) override;
   void processColumn(const std::string& name, const std::string& value) override;
   void processColumn(const std::string& name, double value) override;
   void processColumn(const std::string& name, int value) override;
@@ -52,8 +58,9 @@ private:
 
  private:
   bool pretty_;
-  rapidjson::Document jsonPayload_;
-  rapidjson::Value jsonRow_;
+  rapidjson::Document current_batch_;
+  rapidjson::Value current_row_;
+  ColumnFilter column_filter_;
 };
 
 } /* namespace sql */
diff --git a/extensions/sql/data/MaxCollector.h b/extensions/sql/data/MaxCollector.h
index 8e38959..c3c4b9e 100644
--- a/extensions/sql/data/MaxCollector.h
+++ b/extensions/sql/data/MaxCollector.h
@@ -21,6 +21,8 @@
 #include <string>
 #include <unordered_map>
 #include <tuple>
+#include <vector>
+#include <sstream>
 
 #include "SQLRowSubscriber.h"
 
@@ -31,26 +33,20 @@ namespace minifi {
 namespace sql {
 
 class MaxCollector: public SQLRowSubscriber {
+  void beginProcessBatch() override {}
+  void endProcessBatch() override {}
   void beginProcessRow() override {}
-
-  void endProcessRow() override {
-    if (columnsVerified_) {
-      return;
-    }
-
-    if (countColumns_ != mapState_.size())
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "MaxCollector: Column(s) '" + maxValueColumnNames_ + "' are not found in the columns of '" + selectQuery_ + "' result.");
-
-    columnsVerified_ = true;
+  void endProcessRow() override {}
+  void finishProcessing() override {
+    updateMapState();
   }
 
-  void processColumnName(const std::string& name) override {
-    if (columnsVerified_) {
-      return;
-    }
-
-    if (mapState_.count(name)) {
-      countColumns_++;
+  void processColumnNames(const std::vector<std::string>& names) override {
+    for (const auto& expected : state_) {
+      if (std::find(names.begin(), names.end(), expected.first) == names.end()) {
+        throw minifi::Exception(PROCESSOR_EXCEPTION,
+          "Column '" + expected.first + "' is not found in the columns of '" + query_ + "' result.");
+      }
     }
   }
 
@@ -77,11 +73,12 @@ class MaxCollector: public SQLRowSubscriber {
   void processColumn(const std::string& /*name*/, const char* /*value*/) override {}
 
   template <typename T>
-  struct MaxValue {
-    void updateMaxValue(const std::string& name, const T& value) {
-      const auto it = mapColumnNameValue_.find(name);
-      if (it == mapColumnNameValue_.end()) {
-        mapColumnNameValue_.insert({ name, value });
+  class MaxValue {
+   public:
+    void updateMaxValue(const std::string& column, const T& value) {
+      const auto it = column_maxima.find(column);
+      if (it == column_maxima.end()) {
+        column_maxima.emplace(column, value);
       } else {
         if (value > it->second) {
           it->second = value;
@@ -89,67 +86,53 @@ class MaxCollector: public SQLRowSubscriber {
       }
     }
 
-    std::unordered_map<std::string, T> mapColumnNameValue_;
-  };
-
-  template <typename Tuple, int Index>
-  struct UpdateMapState {
-    UpdateMapState(const Tuple& tpl, std::unordered_map<std::string, std::string>& mapState) {
-      for (auto& el : mapState) {
-        const auto& maxVal = std::get<Index>(tpl);
-
-        const auto it = maxVal.mapColumnNameValue_.find(el.first);
-        if (it != maxVal.mapColumnNameValue_.end()) {
+   protected:
+    void updateStateImpl(std::unordered_map<std::string, std::string>& state) const {
+      for (auto& curr_column_max : state) {
+        const auto it = column_maxima.find(curr_column_max.first);
+        if (it != column_maxima.end()) {
           std::stringstream ss;
           ss << it->second;
-          el.second = ss.str();
+          curr_column_max.second = ss.str();
         }
       }
-
-      UpdateMapState<Tuple, Index - 1>(tpl, mapState);
     }
-  };
 
-  template <typename Tuple>
-  struct UpdateMapState<Tuple, -1> {
-    UpdateMapState(const Tuple&, std::unordered_map<std::string, std::string>&) {}
+   private:
+    std::unordered_map<std::string, T> column_maxima;
   };
 
   template <typename ...Ts>
-  struct MaxValues : public std::tuple<MaxValue<Ts>...> {
-    constexpr static size_t size = sizeof...(Ts);
+  struct MaxValues : public MaxValue<Ts>... {
+    void updateState(std::unordered_map<std::string, std::string>& state) const {
+      (void)(std::initializer_list<int>{(MaxValue<Ts>::updateStateImpl(state), 0)...});
+    }
   };
 
  public:
-  MaxCollector(const std::string& selectQuery, const std::string& maxValueColumnNames, std::unordered_map<std::string, std::string>& mapState)
-    :selectQuery_(selectQuery), maxValueColumnNames_(maxValueColumnNames), mapState_(mapState) {
+  MaxCollector(std::string query, std::unordered_map<std::string, std::string>& state)
+    :query_(std::move(query)), state_(state) {
   }
 
   template <typename T>
-  void updateMaxValue(const std::string& columnName, const T& value) {
-    if (mapState_.count(columnName)) {
-      std::get<MaxValue<T>>(maxValues_).updateMaxValue(columnName, value);
+  void updateMaxValue(const std::string& column_name, const T& value) {
+    if (state_.count(column_name)) {
+      max_values_.MaxValue<T>::updateMaxValue(column_name, value);
     }
   }
 
-  bool updateMapState() {
-    auto mapState = mapState_;
-    UpdateMapState<decltype(maxValues_), decltype(maxValues_)::size - 1>(maxValues_, mapState_);
-
-    return mapState != mapState_;
+  void updateMapState() {
+    max_values_.updateState(state_);
   }
 
  private:
-  const std::string selectQuery_;
-  const std::string maxValueColumnNames_;
-  std::unordered_map<std::string, std::string>& mapState_;
-  MaxValues<std::string, double, int, long long, unsigned long long> maxValues_;
-  size_t countColumns_{};
-  bool columnsVerified_{false};
+  const std::string query_;
+  std::unordered_map<std::string, std::string>& state_;
+  MaxValues<std::string, double, int, long long, unsigned long long> max_values_;
 };
-
-} /* namespace sql */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+  
+}  // namespace sql
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/data/SQLRowSubscriber.h b/extensions/sql/data/SQLRowSubscriber.h
index 89a2f4a..68f90a3 100644
--- a/extensions/sql/data/SQLRowSubscriber.h
+++ b/extensions/sql/data/SQLRowSubscriber.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include <string>
+#include <vector>
 
 namespace org {
 namespace apache {
@@ -28,9 +29,12 @@ namespace sql {
 
 struct SQLRowSubscriber {
   virtual ~SQLRowSubscriber() = default;
+  virtual void beginProcessBatch() = 0;
+  virtual void endProcessBatch() = 0;
   virtual void beginProcessRow() = 0;
   virtual void endProcessRow() = 0;
-  virtual void processColumnName(const std::string& name) = 0;
+  virtual void finishProcessing() = 0;
+  virtual void processColumnNames(const std::vector<std::string>& names) = 0;
   virtual void processColumn(const std::string& name, const std::string& value) = 0;
   virtual void processColumn(const std::string& name, double value) = 0;
   virtual void processColumn(const std::string& name, int value) = 0;
diff --git a/extensions/sql/data/SQLRowsetProcessor.cpp b/extensions/sql/data/SQLRowsetProcessor.cpp
index 1339b9d..136385d 100644
--- a/extensions/sql/data/SQLRowsetProcessor.cpp
+++ b/extensions/sql/data/SQLRowsetProcessor.cpp
@@ -20,6 +20,7 @@
 
 #include "Exception.h"
 #include "Utils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
@@ -27,44 +28,57 @@ namespace nifi {
 namespace minifi {
 namespace sql {
 
-SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers)
-  : rowset_(rowset), rowSubscribers_(rowSubscribers) {
+SQLRowsetProcessor::SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers)
+  : rowset_(rowset), row_subscribers_(std::move(row_subscribers)) {
   iter_ = rowset_.begin();
 }
 
 size_t SQLRowsetProcessor::process(size_t max) {
   size_t count = 0;
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessBatch();
+  }
+
   for (; iter_ != rowset_.end(); ) {
     addRow(*iter_, count);
     iter_++;
     count++;
-    totalCount_++;
     if (max > 0 && count >= max) {
       break;
     }
   }
 
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().endProcessBatch();
+    if (count == 0) {
+      subscriber.get().finishProcessing();
+    }
+  }
+
   return count;
 }
 
 void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
-  for (const auto& pRowSubscriber : rowSubscribers_) {
-    pRowSubscriber->beginProcessRow();
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().beginProcessRow();
   }
 
   if (rowCount == 0) {
+    std::vector<std::string> column_names;
+    column_names.reserve(row.size());
     for (std::size_t i = 0; i != row.size(); ++i) {
-      for (const auto& pRowSubscriber : rowSubscribers_) {
-        pRowSubscriber->processColumnName(utils::toLower(row.get_properties(i).get_name()));
-      }
+      column_names.push_back(utils::StringUtils::toLower(row.get_properties(i).get_name()));
+    }
+    for (const auto& subscriber : row_subscribers_) {
+      subscriber.get().processColumnNames(column_names);
     }
   }
 
   for (std::size_t i = 0; i != row.size(); ++i) {
     const soci::column_properties& props = row.get_properties(i);
 
-    const auto& name = utils::toLower(props.get_name());
+    const auto& name = utils::StringUtils::toLower(props.get_name());
 
     if (row.get_indicator(i) == soci::i_null) {
       processColumn(name, "NULL");
@@ -107,8 +121,8 @@ void SQLRowsetProcessor::addRow(const soci::row& row, size_t rowCount) {
     }
   }
 
-  for (const auto& pRowSubscriber : rowSubscribers_) {
-    pRowSubscriber->endProcessRow();
+  for (const auto& subscriber : row_subscribers_) {
+    subscriber.get().endProcessRow();
   }
 }
 
diff --git a/extensions/sql/data/SQLRowsetProcessor.h b/extensions/sql/data/SQLRowsetProcessor.h
index 1bd4086..11d245d 100644
--- a/extensions/sql/data/SQLRowsetProcessor.h
+++ b/extensions/sql/data/SQLRowsetProcessor.h
@@ -32,7 +32,7 @@ namespace sql {
 
 class SQLRowsetProcessor {
  public:
-  SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, const std::vector<SQLRowSubscriber*>& rowSubscribers);
+  SQLRowsetProcessor(const soci::rowset<soci::row>& rowset, std::vector<std::reference_wrapper<SQLRowSubscriber>> rowSubscribers);
 
   size_t process(size_t max);
 
@@ -41,16 +41,15 @@ class SQLRowsetProcessor {
 
    template <typename T>
    void processColumn(const std::string& name, const T& value) const {
-     for (const auto& pRowSubscriber: rowSubscribers_) {
-       pRowSubscriber->processColumn(name, value);
+     for (const auto& subscriber: row_subscribers_) {
+       subscriber.get().processColumn(name, value);
      }
    }
 
  private:
-  size_t totalCount_{};
   soci::rowset<soci::row>::const_iterator iter_;
   soci::rowset<soci::row> rowset_;
-  std::vector<SQLRowSubscriber*> rowSubscribers_;
+  std::vector<std::reference_wrapper<SQLRowSubscriber>> row_subscribers_;
 };
 
 } /* namespace sql */
diff --git a/extensions/sql/data/SQLWriter.h b/extensions/sql/data/SQLWriter.h
index a329a15..10f445c 100644
--- a/extensions/sql/data/SQLWriter.h
+++ b/extensions/sql/data/SQLWriter.h
@@ -19,7 +19,6 @@
 #pragma once
 
 #include <string>
-#include <iostream>
 
 #include <soci/soci.h>
 
diff --git a/extensions/sql/data/Utils.cpp b/extensions/sql/data/Utils.cpp
index 75530dd..069ec0b 100644
--- a/extensions/sql/data/Utils.cpp
+++ b/extensions/sql/data/Utils.cpp
@@ -18,10 +18,10 @@
 
 #include "Utils.h"
 
-#include <algorithm>
-#include  <cctype>
-#include  <regex>
-#include  <sstream>
+#include <vector>
+#include <string>
+
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
@@ -29,28 +29,14 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string toLower(const std::string& str) {
-  std::string ret;
-
-  // (int(*)(int))std::tolower - to avoid compilation error 'no matching overloaded function found'. 
-  // It is described in https://stackoverflow.com/questions/5539249/why-cant-transforms-begin-s-end-s-begin-tolower-be-complied-successfu.
-  std::transform(str.begin(), str.end(), std::back_inserter(ret), (int(*)(int))std::tolower);
-
-  return ret;
-}
-
 std::vector<std::string> inputStringToList(const std::string& str) {
-  std::vector<std::string> ret;
-
-  std::string token;
-  // Convert to lower and remove white characters.
-  std::istringstream tokenStream(std::regex_replace(toLower(str), std::regex("\\s"), std::string("")));
-
-  while (std::getline(tokenStream, token, ',')) {
-    ret.push_back(token);
+  std::vector<std::string> fragments = StringUtils::split(str, ",");
+  for (auto& item : fragments) {
+    item = StringUtils::toLower(StringUtils::trim(item));
   }
+  fragments.erase(std::remove(fragments.begin(), fragments.end(), ""), fragments.end());
 
-  return ret;
+  return fragments;
 }
 
 } /* namespace utils */
diff --git a/extensions/sql/data/Utils.h b/extensions/sql/data/Utils.h
index fb9758f..d705e76 100644
--- a/extensions/sql/data/Utils.h
+++ b/extensions/sql/data/Utils.h
@@ -27,10 +27,8 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string toLower(const std::string& str);
 std::vector<std::string> inputStringToList(const std::string& str);
 
-
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/sql/patch/soci.patch b/extensions/sql/patch/soci.patch
deleted file mode 100644
index 530571d..0000000
--- a/extensions/sql/patch/soci.patch
+++ /dev/null
@@ -1,18 +0,0 @@
-diff -rupN orig/CMakeLists.txt patched/CMakeLists.txt
---- orig/CMakeLists.txt	2019-11-09 20:08:01.000000000 +0100
-+++ patched/CMakeLists.txt	2019-12-17 15:22:40.000000000 +0100
-@@ -29,8 +29,7 @@ option(SOCI_ASAN "Enable address sanitiz
- ###############################################################################
- 
- # Path to additional CMake modules
--set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})
--set(CMAKE_MODULE_PATH ${SOCI_SOURCE_DIR}/cmake/modules ${CMAKE_MODULE_PATH})
-+list(APPEND CMAKE_MODULE_PATH "${SOCI_SOURCE_DIR}/cmake/modules" "${SOCI_SOURCE_DIR}/cmake")
- 
- include(SociUtilities)
- include(SociConfig)
-@@ -204,4 +203,3 @@ endforeach()
- configure_file("${CONFIG_FILE_IN}" "${CONFIG_FILE_OUT}")
- 
- message(STATUS "")
--
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
index 93dac9e..bae2282 100644
--- a/extensions/sql/processors/ExecuteSQL.cpp
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -20,28 +20,18 @@
 
 #include "ExecuteSQL.h"
 
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sstream>
-#include <stdio.h>
 #include <string>
-#include <iostream>
 #include <memory>
-#include <codecvt>
 
 #include <soci/soci.h>
 
 #include "io/BufferStream.h"
+#include "io/StreamPipe.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
-#include "utils/OsUtils.h"
-#include "data/DatabaseConnectors.h"
 #include "data/JSONSQLWriter.h"
 #include "data/SQLRowsetProcessor.h"
-#include "data/WriteCallback.h"
 
 namespace org {
 namespace apache {
@@ -51,72 +41,88 @@ namespace processors {
 
 const std::string ExecuteSQL::ProcessorName("ExecuteSQL");
 
-const core::Property ExecuteSQL::s_sqlSelectQuery(
-  core::PropertyBuilder::createProperty("SQL select query")->isRequired(true)->withDescription(
+const core::Property ExecuteSQL::SQLSelectQuery(
+  core::PropertyBuilder::createProperty("SQL select query")
+  ->withDescription(
     "The SQL select query to execute. The query can be empty, a constant value, or built from attributes using Expression Language. "
     "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
     "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL select query, to be issued by the processor to the database. "
-    "Note that Expression Language is not evaluated for flow file contents.")->supportsExpressionLanguage(true)->build());
+    "Note that Expression Language is not evaluated for flow file contents.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property ExecuteSQL::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("Max Rows Per Flow File")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included intoi a flow file. If zero then all will be placed into the flow file")->supportsExpressionLanguage(true)->build());
+const core::Relationship ExecuteSQL::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Relationship ExecuteSQL::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultRowCount = "executesql.row.count";
+const std::string ExecuteSQL::RESULT_ROW_COUNT = "executesql.row.count";
+const std::string ExecuteSQL::INPUT_FLOW_FILE_UUID = "input.flowfile.uuid";
 
 ExecuteSQL::ExecuteSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid), max_rows_(0) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<ExecuteSQL>::getLogger()) {
 }
 
-ExecuteSQL::~ExecuteSQL() = default;
-
 void ExecuteSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_sqlSelectQuery, s_maxRowsPerFlowFile});
+  setSupportedProperties({ DBControllerService, OutputFormat, SQLSelectQuery, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void ExecuteSQL::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
 
-  context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
-  auto statement = connection_->prepareStatement(sqlSelectQuery_);
+void ExecuteSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto input_flow_file = session.get();
 
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, { &sqlWriter });
+  std::string query;
+  if (!context.getProperty(SQLSelectQuery, query, input_flow_file)) {
+    if (!input_flow_file) {
+      throw Exception(PROCESSOR_EXCEPTION,
+                      "No incoming FlowFile and the \"" + SQLSelectQuery.getName() + "\" processor property is not specified");
+    }
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe content_reader{buffer};
+    session.read(input_flow_file, &content_reader);
+    query = std::string{reinterpret_cast<const char *>(buffer->getBuffer()), buffer->size()};
+  }
+  if (query.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  auto row_set = connection_->prepareStatement(query)->execute(collectArguments(input_flow_file));
+
+  sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty};
+  FlowFileGenerator flow_file_creator{session, json_writer};
+  sql::SQLRowsetProcessor sql_rowset_processor(row_set, {json_writer, flow_file_creator});
 
   // Process rowset.
-  do {
-    rowCount = sqlRowsetProcessor.process(max_rows_ == 0 ? std::numeric_limits<size_t>::max() : max_rows_);
-    count++;
-    if (rowCount == 0)
-      break;
-
-    const auto output = sqlWriter.toString();
-    if (!output.empty()) {
-      WriteCallback writer(output);
-      auto newflow = session.create();
-      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
-      session.write(newflow, &writer);
-      session.transfer(newflow, s_success);
+  while (size_t row_count = sql_rowset_processor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    gsl_Expects(new_file);
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    if (input_flow_file) {
+      new_file->addAttribute(INPUT_FLOW_FILE_UUID, input_flow_file->getUUIDStr());
     }
-  } while (rowCount > 0);
+  }
+
+  // transfer flow files
+  if (input_flow_file) {
+    session.remove(input_flow_file);
+  }
+  for (const auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+  }
 }
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
index 2a9773d..015b11a 100644
--- a/extensions/sql/processors/ExecuteSQL.h
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -20,16 +20,13 @@
 
 #pragma once
 
-#include "core/Core.h"
-#include "FlowFileRecord.h"
-#include "concurrentqueue.h"
-#include "core/Processor.h"
+#include <string>
+
+#include "core/Resource.h"
 #include "core/ProcessSession.h"
-#include "services/DatabaseService.h"
+#include "core/ProcessContext.h"
 #include "SQLProcessor.h"
-#include "OutputFormat.h"
-
-#include <sstream>
+#include "FlowFileSource.h"
 
 namespace org {
 namespace apache {
@@ -38,33 +35,31 @@ namespace minifi {
 namespace processors {
 
 //! ExecuteSQL Class
-class ExecuteSQL: public SQLProcessor<ExecuteSQL>, public OutputFormat {
+class ExecuteSQL : public SQLProcessor, public FlowFileSource {
  public:
   explicit ExecuteSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
-  virtual ~ExecuteSQL();
 
   //! Processor Name
   static const std::string ProcessorName;
 
-  void processOnSchedule(core::ProcessContext& context);
-  void processOnTrigger(core::ProcessSession& session);
+  void processOnSchedule(core::ProcessContext& context) override;
+  void processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
 
   void initialize() override;
 
-  static const core::Property s_sqlSelectQuery;
-  static const core::Property s_maxRowsPerFlowFile;
+  static const core::Property SQLSelectQuery;
 
-  static const core::Relationship s_success;
+  static const core::Relationship Success;
+  static const core::Relationship Failure;
 
- private:
-  int max_rows_;
-  std::string sqlSelectQuery_;
+  static const std::string RESULT_ROW_COUNT;
+  static const std::string INPUT_FLOW_FILE_UUID;
 };
 
 REGISTER_RESOURCE(ExecuteSQL, "ExecuteSQL to execute SELECT statement via ODBC.");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/FlowFileSource.cpp b/extensions/sql/processors/FlowFileSource.cpp
new file mode 100644
index 0000000..49c5aff
--- /dev/null
+++ b/extensions/sql/processors/FlowFileSource.cpp
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FlowFileSource.h"
+
+#include "FlowFile.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property FlowFileSource::OutputFormat(
+  core::PropertyBuilder::createProperty("Output Format")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue(toString(OutputType::JSONPretty))
+  ->withAllowableValues<std::string>(OutputType::values())
+  ->withDescription("Set the output format type.")->build());
+
+const core::Property FlowFileSource::MaxRowsPerFlowFile(
+  core::PropertyBuilder::createProperty("Max Rows Per Flow File")
+  ->isRequired(true)
+  ->supportsExpressionLanguage(true)
+  ->withDefaultValue<uint64_t>(0)
+  ->withDescription(
+      "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
+      "If the value specified is zero, then all rows are returned in a single FlowFile.")->build());
+
+const std::string FlowFileSource::FRAGMENT_IDENTIFIER = "fragment.identifier";
+const std::string FlowFileSource::FRAGMENT_COUNT = "fragment.count";
+const std::string FlowFileSource::FRAGMENT_INDEX = "fragment.index";
+
+void FlowFileSource::FlowFileGenerator::endProcessBatch() {
+  if (current_batch_size_ == 0) {
+    // do not create flow files with no rows
+    return;
+  }
+
+  OutputStreamPipe writer{std::make_shared<io::BufferStream>(json_writer_.toString())};
+  auto new_flow = session_.create();
+  new_flow->addAttribute(FRAGMENT_INDEX, std::to_string(flow_files_.size()));
+  new_flow->addAttribute(FRAGMENT_IDENTIFIER, batch_id_.to_string());
+  session_.write(new_flow, &writer);
+  flow_files_.push_back(std::move(new_flow));
+}
+
+void FlowFileSource::FlowFileGenerator::finishProcessing() {
+  // annotate the flow files with the fragment.count
+  std::string fragment_count = std::to_string(flow_files_.size());
+  for (const auto& flow_file : flow_files_) {
+    flow_file->addAttribute(FRAGMENT_COUNT, fragment_count);
+  }
+}
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/FlowFileSource.h b/extensions/sql/processors/FlowFileSource.h
new file mode 100644
index 0000000..8cede63
--- /dev/null
+++ b/extensions/sql/processors/FlowFileSource.h
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <memory>
+
+#include "core/Property.h"
+#include "utils/Enum.h"
+#include "data/SQLRowsetProcessor.h"
+#include "ProcessSession.h"
+#include "data/JSONSQLWriter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FlowFileSource {
+ public:
+  static const std::string FRAGMENT_IDENTIFIER;
+  static const std::string FRAGMENT_COUNT;
+  static const std::string FRAGMENT_INDEX;
+
+  static const core::Property OutputFormat;
+  static const core::Property MaxRowsPerFlowFile;
+
+  SMART_ENUM(OutputType,
+    (JSON, "JSON"),
+    (JSONPretty, "JSON-Pretty")
+  )
+
+ protected:
+  class FlowFileGenerator : public sql::SQLRowSubscriber {
+   public:
+    FlowFileGenerator(core::ProcessSession& session, sql::JSONSQLWriter& json_writer)
+      : session_(session),
+        json_writer_(json_writer) {}
+
+    void beginProcessBatch() override {
+      current_batch_size_ = 0;
+    }
+    void endProcessBatch() override;
+
+    void finishProcessing() override;
+
+    void beginProcessRow() override {}
+    void endProcessRow() override {
+      ++current_batch_size_;
+    }
+    void processColumnNames(const std::vector<std::string>& /*names*/) override {}
+    void processColumn(const std::string& /*name*/, const std::string& /*value*/) override {}
+    void processColumn(const std::string& /*name*/, double /*value*/) override {}
+    void processColumn(const std::string& /*name*/, int /*value*/) override {}
+    void processColumn(const std::string& /*name*/, long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, unsigned long long /*value*/) override {}
+    void processColumn(const std::string& /*name*/, const char* /*value*/) override {}
+
+    std::shared_ptr<core::FlowFile> getLastFlowFile() const {
+      if (!flow_files_.empty()) {
+        return flow_files_.back();
+      }
+      return {};
+    }
+
+    std::vector<std::shared_ptr<core::FlowFile>>& getFlowFiles() {
+      return flow_files_;
+    }
+
+   private:
+    core::ProcessSession& session_;
+    sql::JSONSQLWriter& json_writer_;
+    const utils::Identifier batch_id_{utils::IdGenerator::getIdGenerator()->generate()};
+    size_t current_batch_size_{0};
+    std::vector<std::shared_ptr<core::FlowFile>> flow_files_;
+  };
+
+  OutputType output_format_;
+  size_t max_rows_{0};
+};
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/OutputFormat.cpp b/extensions/sql/processors/OutputFormat.cpp
deleted file mode 100644
index acd9f69..0000000
--- a/extensions/sql/processors/OutputFormat.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "OutputFormat.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-const std::string s_outputFormatJSON = "JSON";
-const std::string s_outputFormatJSONPretty = "JSON-Pretty";
-
-const core::Property& OutputFormat::outputFormat() {
-  static const core::Property s_outputFormat =
-      core::PropertyBuilder::createProperty("Output Format")->
-          isRequired(true)->
-          withDefaultValue(s_outputFormatJSONPretty)->
-          withAllowableValues<std::string>({ s_outputFormatJSON, s_outputFormatJSONPretty })->
-          withDescription("Set the output format type.")->
-          build();
-
-  return s_outputFormat;
-}
-
-bool OutputFormat::isJSONFormat() const {
-  return outputFormat_ == s_outputFormatJSON || outputFormat_ == s_outputFormatJSONPretty;
-}
-
-bool OutputFormat::isJSONPretty() const {
-  return outputFormat_ == s_outputFormatJSONPretty;
-}
-
-void OutputFormat::initOutputFormat(const core::ProcessContext& context) {
-  context.getProperty(outputFormat().getName(), outputFormat_);
-}
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/sql/processors/OutputFormat.h b/extensions/sql/processors/OutputFormat.h
deleted file mode 100644
index e94b538..0000000
--- a/extensions/sql/processors/OutputFormat.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * @file OutputFormat.h
- * OutputFormat class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "core/Core.h"
-#include "core/Processor.h"
-
-#include <string>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-class OutputFormat {
- protected:
-  static const core::Property& outputFormat();
-
-  bool isJSONFormat() const;
-
-  bool isJSONPretty() const;
-
-  void initOutputFormat(const core::ProcessContext& context);
-
- protected:
-   std::string outputFormat_;
-};
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index a76bab2..2e4814b 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -21,23 +21,16 @@
 #include "PutSQL.h"
 
 #include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sstream>
-#include <stdio.h>
 #include <string>
-#include <iostream>
 #include <memory>
-#include <codecvt>
 
 #include <soci/soci.h>
 
 #include "io/BufferStream.h"
+#include "io/StreamPipe.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
-#include "utils/OsUtils.h"
 #include "data/DatabaseConnectors.h"
 #include "data/JSONSQLWriter.h"
 
@@ -49,53 +42,56 @@ namespace processors {
 
 const std::string PutSQL::ProcessorName("PutSQL");
 
-const core::Property PutSQL::s_sqlStatements(
-  core::PropertyBuilder::createProperty("SQL statements")->isRequired(true)->withDefaultValue("System")->withDescription(
-    "A semicolon-delimited list of SQL statements to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. "
-    "If this property is specified, it will be used regardless of the content of incoming flowfiles. "
-    "If this property is empty, the content of the incoming flow file is expected to contain a valid SQL statements, to be issued by the processor to the database.")
-    ->supportsExpressionLanguage(true)->build());
+const core::Property PutSQL::SQLStatement(
+  core::PropertyBuilder::createProperty("SQL Statement")
+  ->isRequired(false)
+  ->withDescription(
+      "The SQL statement to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. "
+      "If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of "
+      "the incoming flow file is expected to contain a valid SQL statement, to be issued by the processor to the database.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Relationship PutSQL::s_success("success", "Database is successfully updated.");
+const core::Relationship PutSQL::Success("success", "Database is successfully updated.");
 
 PutSQL::PutSQL(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<PutSQL>::getLogger()) {
 }
 
-PutSQL::~PutSQL() = default;
-
 void PutSQL::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), s_sqlStatements });
+  setSupportedProperties({ DBControllerService, SQLStatement });
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void PutSQL::processOnSchedule(core::ProcessContext& context) {
-  std::string sqlStatements;
-  context.getProperty(s_sqlStatements.getName(), sqlStatements);
-  sqlStatements_ = utils::StringUtils::split(sqlStatements, ";");
-}
+void PutSQL::processOnSchedule(core::ProcessContext& /*context*/) {}
 
-void PutSQL::processOnTrigger(core::ProcessSession& /*session*/) {
-  const auto dbSession = connection_->getSession();
-
-  try {
-    dbSession->begin();
-    for (const auto& statement : sqlStatements_) {
-      dbSession->execute(statement);
-    }
-    dbSession->commit();
-  } catch (std::exception& e) {
-    logger_->log_error("SQL statement error: %s", e.what());
-    dbSession->rollback();
-    throw;
+void PutSQL::processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+  auto flow_file = session.get();
+  if (!flow_file) {
+    context.yield();
+    return;
+  }
+  session.transfer(flow_file, Success);
+
+  std::string sql_statement;
+  if (!context.getProperty(SQLStatement, sql_statement, flow_file)) {
+    logger_->log_debug("Using the contents of the flow file as the SQL statement");
+    auto buffer = std::make_shared<io::BufferStream>();
+    InputStreamPipe read_callback{buffer};
+    session.read(flow_file, &read_callback);
+    sql_statement = std::string{reinterpret_cast<const char*>(buffer->getBuffer()), buffer->size()};
   }
+  if (sql_statement.empty()) {
+    throw Exception(PROCESSOR_EXCEPTION, "Empty SQL statement");
+  }
+
+  connection_->prepareStatement(sql_statement)->execute(collectArguments(flow_file));
 }
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
index 6c5494d..1e4dc9e 100644
--- a/extensions/sql/processors/PutSQL.h
+++ b/extensions/sql/processors/PutSQL.h
@@ -20,16 +20,12 @@
 
 #pragma once
 
-#include "core/Core.h"
-#include "FlowFileRecord.h"
-#include "concurrentqueue.h"
-#include "core/Processor.h"
+#include <string>
+
+#include "core/Resource.h"
 #include "core/ProcessSession.h"
-#include "services/DatabaseService.h"
 #include "SQLProcessor.h"
 
-#include <sstream>
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -37,31 +33,27 @@ namespace minifi {
 namespace processors {
 
 //! PutSQL Class
-class PutSQL: public SQLProcessor<PutSQL> {
+class PutSQL: public SQLProcessor {
  public:
   explicit PutSQL(const std::string& name, utils::Identifier uuid = utils::Identifier());
-  virtual ~PutSQL();
 
   //! Processor Name
   static const std::string ProcessorName;
 
-  void processOnSchedule(core::ProcessContext &context);
-  void processOnTrigger(core::ProcessSession &session);
+  void processOnSchedule(core::ProcessContext& context) override;
+  void processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
   
   void initialize() override;
 
-  static const core::Property s_sqlStatements;
-
-  static const core::Relationship s_success;
+  static const core::Property SQLStatement;
 
- private:
-   std::vector<std::string> sqlStatements_;
+  static const core::Relationship Success;
 };
 
 REGISTER_RESOURCE(PutSQL, "PutSQL to execute SQL command via ODBC.");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp
index 25c7d8d..a136cb0 100644
--- a/extensions/sql/processors/QueryDatabaseTable.cpp
+++ b/extensions/sql/processors/QueryDatabaseTable.cpp
@@ -21,17 +21,9 @@
 #include "QueryDatabaseTable.h"
 
 #include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sstream>
-#include <cstdio>
 #include <string>
-#include <iostream>
 #include <memory>
-#include <codecvt>
 #include <algorithm>
-#include <regex>
 
 #include <soci/soci.h>
 
@@ -39,14 +31,11 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
-#include "utils/OsUtils.h"
-#include "data/DatabaseConnectors.h"
 #include "data/JSONSQLWriter.h"
 #include "data/SQLRowsetProcessor.h"
-#include "data/WriteCallback.h"
 #include "data/MaxCollector.h"
 #include "data/Utils.h"
-#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org {
 namespace apache {
@@ -56,17 +45,25 @@ namespace processors {
 
 const std::string QueryDatabaseTable::ProcessorName("QueryDatabaseTable");
 
-const core::Property QueryDatabaseTable::s_tableName(
-  core::PropertyBuilder::createProperty("Table Name")->isRequired(true)->withDescription("The name of the database table to be queried.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::TableName(
+  core::PropertyBuilder::createProperty("Table Name")
+  ->isRequired(true)
+  ->withDescription("The name of the database table to be queried.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_columnNames(
-  core::PropertyBuilder::createProperty("Columns to Return")->isRequired(false)->withDescription(
+const core::Property QueryDatabaseTable::ColumnNames(
+  core::PropertyBuilder::createProperty("Columns to Return")
+  ->isRequired(false)
+  ->withDescription(
     "A comma-separated list of column names to be used in the query. If your database requires special treatment of the names (quoting, e.g.), each name should include such treatment. "
     "If no column names are supplied, all columns in the specified table will be returned. "
-    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")->supportsExpressionLanguage(true)->build());
+    "NOTE: It is important to use consistent column names for a given table for incremental fetch to work properly.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_maxValueColumnNames(
-  core::PropertyBuilder::createProperty("Maximum-value Columns")->isRequired(false)->withDescription(
+const core::Property QueryDatabaseTable::MaxValueColumnNames(
+  core::PropertyBuilder::createProperty("Maximum-value Columns")
+  ->isRequired(false)
+  ->withDescription(
     "A comma-separated list of column names. The processor will keep track of the maximum value for each column that has been returned since the processor started running. "
     "Using multiple columns implies an order to the column list, and each column's values are expected to increase more slowly than the previous columns' values. "
     "Thus, using multiple columns implies a hierarchical structure of columns, which is usually used for partitioning tables. "
@@ -75,361 +72,234 @@ const core::Property QueryDatabaseTable::s_maxValueColumnNames(
     "If no columns are provided, all rows from the table will be considered, which could have a performance impact. "
     "NOTE: It is important to use consistent max-value column names for a given table for incremental fetch to work properly. "
     "NOTE: Because of a limitation of database access library 'soci', which doesn't support milliseconds in it's 'dt_date', "
-    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")->
-    supportsExpressionLanguage(true)->build());
+    "there is a possibility that flowfiles might have duplicated records, if a max-value column with 'dt_date' type has value with milliseconds.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_whereClause(
-  core::PropertyBuilder::createProperty("db-fetch-where-clause")->isRequired(false)->withDescription(
-    "A custom clause to be added in the WHERE condition when building SQL queries.")->supportsExpressionLanguage(true)->build());
+const core::Property QueryDatabaseTable::WhereClause(
+  core::PropertyBuilder::createProperty("Where Clause")
+  ->isRequired(false)
+  ->withDescription("A custom clause to be added in the WHERE condition when building SQL queries.")
+  ->supportsExpressionLanguage(true)->build());
 
-const core::Property QueryDatabaseTable::s_sqlQuery(
-  core::PropertyBuilder::createProperty("db-fetch-sql-query")->isRequired(false)->withDescription(
-    "A custom SQL query used to retrieve data. Instead of building a SQL query from other properties, this query will be wrapped as a sub-query. "
-    "Query must have no ORDER BY statement.")->supportsExpressionLanguage(true)->build());
+const std::string QueryDatabaseTable::InitialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
-const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
-  core::PropertyBuilder::createProperty("qdbt-max-rows")->isRequired(true)->withDefaultValue<int>(0)->withDescription(
-    "The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. "
-    "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
+const core::Relationship QueryDatabaseTable::Success("success", "Successfully created FlowFile from SQL query result set.");
 
-const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
+const std::string QueryDatabaseTable::RESULT_TABLE_NAME = "tablename";
+const std::string QueryDatabaseTable::RESULT_ROW_COUNT = "querydbtable.row.count";
 
-const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
-
-const core::Relationship QueryDatabaseTable::s_success("success", "Successfully created FlowFile from SQL query result set.");
-
-static const std::string ResultTableName = "tablename";
-static const std::string ResultRowCount = "querydbtable.row.count";
-
-static const std::string TABLENAME_KEY = "tablename";
-static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
-
-// State
-class LegacyState {
- public:
-  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
-    :tableName_(tableName), logger_(logger) {
-
-    filePath_ = utils::file::FileUtils::concat_path(
-            utils::file::FileUtils::concat_path(
-              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
-
-    if (!getStateFromFile())
-      return;
-
-    ok_ = true;
-  }
-
-  explicit operator bool() const {
-    return ok_;
-  }
-
-  const std::unordered_map<std::string, std::string>& getStateMap() const {
-    return mapState_;
-  }
-
-  bool moveStateFileToMigrated() {
-    if (!ok_) {
-      return false;
-    }
-    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
-  }
-
- private:
-  static const std::string separator_;
-
-   bool getStateFromFile() {
-     std::string state;
-
-     std::ifstream file(filePath_);
-     if (!file) {
-       return false;
-     }
-
-     std::stringstream ss;
-     ss << file.rdbuf();
-
-     state = ss.str();
-
-     file.close();
-
-     std::vector<std::string> listColumnNameValue;
-
-     size_t pos = state.find(separator_, 0);
-     if (pos == std::string::npos) {
-       logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     auto tableName = state.substr(0, pos);
-     if (tableName != tableName_) {
-       logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
-       mapState_.clear();
-
-       return false;
-     }
-
-     pos += separator_.size();
-
-     while (true) {
-       auto newPos = state.find(separator_, pos);
-       if (newPos == std::string::npos)
-         break;
-
-       const std::string& columnNameValue = state.substr(pos, newPos - pos);
-       listColumnNameValue.emplace_back(columnNameValue);
-
-       pos = newPos + separator_.size();
-     }
-
-     for (const auto& columnNameValue : listColumnNameValue) {
-       const auto posEQ = columnNameValue.find('=');
-       if (posEQ == std::string::npos) {
-         logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
-         mapState_.clear();
-         return false;
-       }
-
-       const auto& name = columnNameValue.substr(0, posEQ);
-       const auto& value = columnNameValue.substr(posEQ + 1);
-
-       mapState_.insert({ name, value });
-     }
-
-     return true;
-   }
-
- private:
-   std::unordered_map<std::string, std::string> mapState_;
-   std::string filePath_;
-   std::string tableName_;
-   std::shared_ptr<logging::Logger> logger_;
-   bool ok_{};
-};
-
-const std::string LegacyState::separator_ = "@!qdt!@";
+const std::string QueryDatabaseTable::TABLENAME_KEY = "tablename";
+const std::string QueryDatabaseTable::MAXVALUE_KEY_PREFIX = "maxvalue.";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
-  : SQLProcessor(name, uuid) {
+  : SQLProcessor(name, uuid, logging::LoggerFactory<QueryDatabaseTable>::getLogger()) {
 }
 
-QueryDatabaseTable::~QueryDatabaseTable() = default;
-
 void QueryDatabaseTable::initialize() {
   //! Set the supported properties
-  setSupportedProperties({ dbControllerService(), outputFormat(), s_tableName, s_columnNames, s_maxValueColumnNames, s_whereClause, s_sqlQuery, s_maxRowsPerFlowFile, s_stateDirectory});
+  setSupportedProperties({
+    DBControllerService, OutputFormat, TableName, ColumnNames,
+    MaxValueColumnNames, WhereClause, MaxRowsPerFlowFile});
 
   //! Set the supported relationships
-  setSupportedRelationships({ s_success });
+  setSupportedRelationships({ Success });
 }
 
-void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
-  initOutputFormat(context);
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext& context) {
+  context.getProperty(OutputFormat.getName(), output_format_);
+  max_rows_ = [&] {
+    uint64_t max_rows;
+    context.getProperty(MaxRowsPerFlowFile.getName(), max_rows);
+    return gsl::narrow<size_t>(max_rows);
+  }();
 
-  context.getProperty(s_tableName.getName(), tableName_);
-  context.getProperty(s_columnNames.getName(), columnNames_);
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
+  context.getProperty(TableName.getName(), table_name_);
+  context.getProperty(WhereClause.getName(), extra_where_clause_);
+  max_value_columns_ = [&] {
+    std::string max_value_columns_str;
+    context.getProperty(MaxValueColumnNames.getName(), max_value_columns_str);
+    return utils::inputStringToList(max_value_columns_str);
+  }();
+  return_columns_ = [&] {
+    std::string return_columns_str;
+    context.getProperty(ColumnNames.getName(), return_columns_str);
+    return utils::inputStringToList(return_columns_str);
+  }();
+  queried_columns_ = utils::StringUtils::join(", ", return_columns_);
+  if (!queried_columns_.empty() && !max_value_columns_.empty()) {
+    // columns will be explicitly enumerated, we need to add the max value columns
+    queried_columns_ = queried_columns_ + ", " + utils::StringUtils::join(", ", max_value_columns_);
+  }
 
-  context.getProperty(s_maxValueColumnNames.getName(), maxValueColumnNames_);
-  listMaxValueColumnName_ = utils::inputStringToList(maxValueColumnNames_);
+  initializeMaxValues(context);
+}
 
-  context.getProperty(s_whereClause.getName(), whereClause_);
-  context.getProperty(s_sqlQuery.getName(), sqlQuery_);
-  context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
+void QueryDatabaseTable::processOnTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) {
+  const auto& selectQuery = buildSelectQuery();
 
-  mapState_.clear();
+  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
 
-  state_manager_ = context.getStateManager();
-  if (state_manager_ == nullptr) {
-    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  auto statement = connection_->prepareStatement(selectQuery);
+
+  auto rowset = statement->execute();
+
+  std::unordered_map<std::string, std::string> new_max_values = max_values_;
+  sql::MaxCollector maxCollector{selectQuery, new_max_values};
+  auto column_filter = [&] (const std::string& column_name) {
+    return return_columns_.empty()
+      || std::find(return_columns_.begin(), return_columns_.end(), column_name) != return_columns_.end();
+  };
+  sql::JSONSQLWriter json_writer{output_format_ == OutputType::JSONPretty, column_filter};
+  FlowFileGenerator flow_file_creator{session, json_writer};
+  sql::SQLRowsetProcessor sql_rowset_processor(rowset, {json_writer, maxCollector, flow_file_creator});
+
+  while (size_t row_count = sql_rowset_processor.process(max_rows_)) {
+    auto new_file = flow_file_creator.getLastFlowFile();
+    gsl_Expects(new_file);
+    new_file->addAttribute(RESULT_ROW_COUNT, std::to_string(row_count));
+    new_file->addAttribute(RESULT_TABLE_NAME, table_name_);
   }
 
-  std::unordered_map<std::string, std::string> state_map;
-  if (state_manager_->get(state_map)) {
-    if (state_map[TABLENAME_KEY] != tableName_) {
-      state_manager_->clear();
-    } else {
-      for (auto&& elem : state_map) {
-        if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
-          mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
-        }
-      }
+  // the updated max_values and the total number of flow_files is available from here
+  for (auto& new_file : flow_file_creator.getFlowFiles()) {
+    session.transfer(new_file, Success);
+    for (const auto& max_column : max_value_columns_) {
+      new_file->addAttribute("maxvalue." + max_column, new_max_values[max_column]);
     }
-  } else {
-    // Try to migrate legacy state file
-    std::string stateDir;
-    context.getProperty(s_stateDirectory.getName(), stateDir);
-    if (!stateDir.empty()) {
-      LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
-      if (legacyState) {
-        mapState_ = legacyState.getStateMap();
-        if (saveState() && state_manager_->persist()) {
-          logger_->log_info("State migration successful");
-          legacyState.moveStateFileToMigrated();
-        } else {
-          logger_->log_warn("Failed to persists migrated state");
-        }
+  }
+
+  if (new_max_values != max_values_) {
+    session.commit();
+    max_values_ = new_max_values;
+    saveState();
+  }
+}
+
+bool QueryDatabaseTable::loadMaxValuesFromStoredState(const std::unordered_map<std::string, std::string> &state) {
+  std::unordered_map<std::string, std::string> new_max_values;
+  if (state.count(TABLENAME_KEY) == 0) {
+    logger_->log_info("State does not specify the table name.");
+    return false;
+  }
+  if (state.at(TABLENAME_KEY) != table_name_) {
+    logger_->log_info("Querying new table \"%s\", resetting state.", table_name_);
+    return false;
+  }
+  for (auto& elem : state) {
+    if (utils::StringUtils::startsWith(elem.first, MAXVALUE_KEY_PREFIX)) {
+      std::string column_name = elem.first.substr(MAXVALUE_KEY_PREFIX.length());
+      // add only those columns that we care about
+      if (std::find(max_value_columns_.begin(), max_value_columns_.end(), column_name) != max_value_columns_.end()) {
+        new_max_values.emplace(column_name, elem.second);
       } else {
-        logger_->log_warn("Could not migrate state from specified State Directory %s", stateDir);
+        logger_->log_info("State contains obsolete maximum-value column \"%s\", resetting state.", column_name);
+        return false;
       }
     }
   }
+  for (auto& column : max_value_columns_) {
+    if (new_max_values.find(column) == new_max_values.end()) {
+      logger_->log_info("New maximum-value column \"%s\" specified, resetting state.", column);
+      return false;
+    }
+  }
+  max_values_ = new_max_values;
+  return true;
+}
 
-  // If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
-  if (listMaxValueColumnName_.size() != mapState_.size()) {
-    mapState_.clear();
+void QueryDatabaseTable::initializeMaxValues(core::ProcessContext &context) {
+  max_values_.clear();
+  std::unordered_map<std::string, std::string> stored_state;
+  if (!state_manager_->get(stored_state)) {
+    logger_->log_info("Found no stored state");
   } else {
-    for (const auto& columName : listMaxValueColumnName_) {
-      if (0 == mapState_.count(columName)) {
-        mapState_.clear();
-        break;
-      }
+    if (!loadMaxValuesFromStoredState(stored_state)) {
+      state_manager_->clear();
     }
   }
 
-  // Add in 'mapState_' new columns which are in 'listMaxValueColumnName_'.
-  for (const auto& maxValueColumnName: listMaxValueColumnName_) {
-    if (0 == mapState_.count(maxValueColumnName)) {
-      mapState_.insert({maxValueColumnName, std::string()});
-    }
+  for (const auto& column_name : max_value_columns_) {
+    // initialize column values
+    max_values_[column_name];
   }
 
+  loadMaxValuesFromDynamicProperties(context);
+}
+
+void QueryDatabaseTable::loadMaxValuesFromDynamicProperties(core::ProcessContext &context) {
   const auto dynamic_prop_keys = context.getDynamicPropertyKeys();
   logger_->log_info("Received %zu dynamic properties", dynamic_prop_keys.size());
 
-  // If the stored state for a max value column is empty, populate it with the corresponding initial max value, if it exists.
   for (const auto& key : dynamic_prop_keys) {
-    if (std::string::npos == key.rfind(s_initialMaxValueDynamicPropertyPrefix, 0)) {
+    if (!utils::StringUtils::startsWith(key, InitialMaxValueDynamicPropertyPrefix)) {
       throw minifi::Exception(PROCESSOR_EXCEPTION, "QueryDatabaseTable: Unsupported dynamic property \"" + key + "\"");
     }
-    const auto columnName = utils::toLower(key.substr(s_initialMaxValueDynamicPropertyPrefix.length()));
-    auto it = mapState_.find(columnName);
-    if (it == mapState_.end()) {
-      logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", columnName);
+    const auto column_name = utils::StringUtils::toLower(key.substr(InitialMaxValueDynamicPropertyPrefix.length()));
+    auto it = max_values_.find(column_name);
+    if (it == max_values_.end()) {
+      logger_->log_warn("Initial maximum value specified for column \"%s\", which is not specified as a Maximum-value Column. Ignoring.", column_name);
       continue;
     }
+    // do not overwrite existing max value
     if (!it->second.empty()) {
       continue;
     }
     std::string value;
     if (context.getDynamicProperty(key, value) && !value.empty()) {
       it->second = value;
-      logger_->log_info("Setting initial maximum value of %s to %s", columnName, value);
+      logger_->log_info("Setting initial maximum value of %s to %s", column_name, value);
     }
   }
 }
 
-void QueryDatabaseTable::processOnTrigger(core::ProcessSession& session) {
-  const auto& selectQuery = getSelectQuery();
+std::string QueryDatabaseTable::buildSelectQuery() {
+  std::string query = "select " + (queried_columns_.empty() ? "*" : queried_columns_) + " from " + table_name_;
 
-  logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
-
-  auto statement = connection_->prepareStatement(selectQuery);
-
-  auto rowset = statement->execute();
-
-  int count = 0;
-  size_t rowCount = 0;
-  sql::MaxCollector maxCollector(selectQuery, maxValueColumnNames_, mapState_);
-  sql::JSONSQLWriter sqlWriter(isJSONPretty());
-  sql::SQLRowsetProcessor sqlRowsetProcessor(rowset, {&sqlWriter, &maxCollector});
-
-  // Process rowset.
-  do {
-    rowCount = sqlRowsetProcessor.process(maxRowsPerFlowFile_ == 0 ? std::numeric_limits<size_t>::max() : maxRowsPerFlowFile_);
-    count++;
-    if (rowCount == 0)
-      break;
-
-    const auto output = sqlWriter.toString();
-    if (!output.empty()) {
-      WriteCallback writer(output);
-      auto newflow = session.create();
-      newflow->addAttribute(ResultRowCount, std::to_string(rowCount));
-      newflow->addAttribute(ResultTableName, tableName_);
-      session.write(newflow, &writer);
-      session.transfer(newflow, s_success);
-    }
-  } while (rowCount > 0);
-
-  const auto mapState = mapState_;
-  if (maxCollector.updateMapState()) {
-    try {
-      session.commit();
-    } catch (std::exception& e) {
-      mapState_ = mapState;
-      throw;
-    }
-
-    saveState();
-  }
-}
+  std::vector<std::string> where_clauses;
 
-std::string QueryDatabaseTable::getSelectQuery() {
-  std::string ret;
-
-  if (sqlQuery_.empty()) {
-    std::string columns;
-    if (columnNames_.empty()) {
-      columns = "*";
-    } else {
-      columns = columnNames_;
+  for (size_t index = 0; index < max_value_columns_.size(); index++) {
+    const auto& column_name = max_value_columns_[index];
+    const auto& max_value = max_values_[column_name];
+    if (max_value.empty()) {
+      // max value has not been set for this column
+      continue;
     }
-    ret = "select " + columns + " from " + tableName_;
-  } else {
-    ret = sqlQuery_;
-  }
-
-  std::string whereClauses;
-
-  if (!mapState_.empty() && !listMaxValueColumnName_.empty()) {
-    for (auto index = 0U; index < listMaxValueColumnName_.size(); index++) {
-      const auto& columnName = listMaxValueColumnName_[index];
 
-      const auto itState = mapState_.find(columnName);
-
-      const auto& maxValue = itState->second;
-      if (maxValue.empty()) {
-        continue;
-      }
-
-      // Logic to differentiate ">" vs ">=" based on index is copied from:
-      // https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
-      // (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
-      if (index == 0) {
-        whereClauses += columnName + " > ";
-      } else {
-        whereClauses += " and " + columnName + " >= ";
-      }
-      whereClauses += maxValue;
-    }
+    // Logic to differentiate ">" vs ">=" based on index is copied from:
+    // https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+    // (under comment "Add a condition for the WHERE clause"). And implementation explanation: https://issues.apache.org/jira/browse/NIFI-2712.
+    where_clauses.push_back(utils::StringUtils::join_pack(column_name, index == 0 ? " > " : " >= ", max_value));
   }
 
-  if (!whereClause_.empty()) {
-    whereClauses += " and " + whereClause_;
+  if (!extra_where_clause_.empty()) {
+    where_clauses.push_back(extra_where_clause_);
   }
 
-  if (!whereClauses.empty()) {
-    ret += " where " + whereClauses;
+  if (!where_clauses.empty()) {
+    query += " where " + utils::StringUtils::join(" and ", where_clauses);
   }
 
-  return ret;
+  return query;
 }
 
 bool QueryDatabaseTable::saveState() {
   std::unordered_map<std::string, std::string> state_map;
-  state_map.emplace(TABLENAME_KEY, tableName_);
-  for (const auto& elem : mapState_) {
-    state_map.emplace(MAXVALUE_KEY_PREFIX + elem.first, elem.second);
+  state_map.emplace(TABLENAME_KEY, table_name_);
+  for (const auto& item : max_values_) {
+    state_map.emplace(MAXVALUE_KEY_PREFIX + item.first, item.second);
   }
   return state_manager_->set(state_map);
 }
 
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
index 44f1ed7..de397c0 100644
--- a/extensions/sql/processors/QueryDatabaseTable.h
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -20,17 +20,15 @@
 
 #pragma once
 
-#include "core/Core.h"
-#include "FlowFileRecord.h"
-#include "concurrentqueue.h"
-#include "core/Processor.h"
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <memory>
+
+#include "core/Resource.h"
 #include "core/ProcessSession.h"
-#include "services/DatabaseService.h"
 #include "SQLProcessor.h"
-#include "OutputFormat.h"
-
-#include <sstream>
-#include <unordered_map>
+#include "FlowFileSource.h"
 
 namespace org {
 namespace apache {
@@ -38,60 +36,61 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class State;
-
 //! QueryDatabaseTable Class
-class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public OutputFormat {
+class QueryDatabaseTable: public SQLProcessor, public FlowFileSource {
  public:
   explicit QueryDatabaseTable(const std::string& name, utils::Identifier uuid = utils::Identifier());
-  virtual ~QueryDatabaseTable();
+
+  static const std::string RESULT_TABLE_NAME;
+  static const std::string RESULT_ROW_COUNT;
+
+  static const std::string TABLENAME_KEY;
+  static const std::string MAXVALUE_KEY_PREFIX;
 
   //! Processor Name
   static const std::string ProcessorName;
 
-  static const core::Property s_tableName;
-  static const core::Property s_columnNames;
-  static const core::Property s_maxValueColumnNames;
-  static const core::Property s_whereClause;
-  static const core::Property s_sqlQuery;
-  static const core::Property s_maxRowsPerFlowFile;
-  static const core::Property s_stateDirectory;
+  static const core::Property TableName;
+  static const core::Property ColumnNames;
+  static const core::Property MaxValueColumnNames;
+  static const core::Property WhereClause;
 
-  static const std::string s_initialMaxValueDynamicPropertyPrefix;
+  static const std::string InitialMaxValueDynamicPropertyPrefix;
 
-  static const core::Relationship s_success;
+  static const core::Relationship Success;
 
   bool supportsDynamicProperties() override {
     return true;
   }
 
-  void processOnSchedule(core::ProcessContext& context);
-  void processOnTrigger(core::ProcessSession& session);
+  void processOnSchedule(core::ProcessContext& context) override;
+  void processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
 
   void initialize() override;
 
  private:
-  std::string getSelectQuery();
+  std::string buildSelectQuery();
+
+  void initializeMaxValues(core::ProcessContext& context);
+  bool loadMaxValuesFromStoredState(const std::unordered_map<std::string, std::string>& state);
+  void loadMaxValuesFromDynamicProperties(core::ProcessContext& context);
 
   bool saveState();
 
  private:
   std::shared_ptr<core::CoreComponentStateManager> state_manager_;
-  std::string tableName_;
-  std::string columnNames_;
-  std::string maxValueColumnNames_;
-  std::string whereClause_;
-  std::string sqlQuery_;
-  int maxRowsPerFlowFile_{};
-  std::vector<std::string> listMaxValueColumnName_;
-  std::unordered_map<std::string, std::string> mapState_;
-  std::unordered_map<std::string, soci::data_type> mapColumnType_;
+  std::string table_name_;
+  std::vector<std::string> return_columns_;
+  std::string queried_columns_;
+  std::string extra_where_clause_;
+  std::vector<std::string> max_value_columns_;
+  std::unordered_map<std::string, std::string> max_values_;
 };
 
 REGISTER_RESOURCE(QueryDatabaseTable, "QueryDatabaseTable to execute SELECT statement via ODBC.");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/SQLProcessor.cpp b/extensions/sql/processors/SQLProcessor.cpp
new file mode 100644
index 0000000..5238a1c
--- /dev/null
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SQLProcessor.h"
+
+#include <vector>
+#include <memory>
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "Exception.h"
+
+#include <soci/error.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const core::Property SQLProcessor::DBControllerService(
+    core::PropertyBuilder::createProperty("DB Controller Service")
+    ->isRequired(true)
+    ->withDescription("Database Controller Service.")
+    ->supportsExpressionLanguage(true)->build());
+
+void SQLProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
+  std::string controllerService;
+  context->getProperty(DBControllerService.getName(), controllerService);
+
+  db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
+  if (!db_service_) {
+    throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + DBControllerService.getName() + "' must be defined");
+  }
+
+  processOnSchedule(*context);
+}
+
+void SQLProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  std::lock_guard<std::mutex> guard(on_trigger_mutex_);
+
+  try {
+    if (!connection_) {
+      connection_ = db_service_->getConnection();
+    }
+    processOnTrigger(*context, *session);
+  } catch (const soci::soci_error& e) {
+    logger_->log_error("SQLProcessor: '%s'", e.what());
+    if (connection_) {
+      std::string exp;
+      if (!connection_->connected(exp)) {
+        logger_->log_error("SQLProcessor: Connection exception: %s", exp);
+        // try to reconnect next time
+        connection_.reset();
+      }
+    }
+    throw;
+  }
+}
+
+std::vector<std::string> SQLProcessor::collectArguments(const std::shared_ptr<core::FlowFile> &flow_file) {
+  if (!flow_file) {
+    return {};
+  }
+  std::vector<std::string> arguments;
+  for (size_t arg_idx{1};; ++arg_idx) {
+    std::string arg;
+    if (!flow_file->getAttribute("sql.args." + std::to_string(arg_idx) + ".value", arg)) {
+      break;
+    }
+    arguments.push_back(std::move(arg));
+  }
+  return arguments;
+}
+
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/processors/SQLProcessor.h b/extensions/sql/processors/SQLProcessor.h
index 900cf65..73e50b3 100644
--- a/extensions/sql/processors/SQLProcessor.h
+++ b/extensions/sql/processors/SQLProcessor.h
@@ -21,10 +21,11 @@
 #pragma once
 
 #include "core/Core.h"
-#include "FlowFileRecord.h"
-#include "concurrentqueue.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
+#include "utils/Enum.h"
+
+#include "services/DatabaseService.h"
 
 namespace org {
 namespace apache {
@@ -32,74 +33,38 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-template <typename T>
 class SQLProcessor: public core::Processor {
+ public:
+  static const core::Property DBControllerService;
+
  protected:
-  SQLProcessor(const std::string& name, utils::Identifier uuid)
-    : core::Processor(name, uuid), logger_(logging::LoggerFactory<T>::getLogger()) {
+  SQLProcessor(const std::string& name, utils::Identifier uuid, std::shared_ptr<logging::Logger> logger)
+    : core::Processor(name, uuid), logger_(std::move(logger)) {
   }
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) override {
-    std::string controllerService;
-    context->getProperty(dbControllerService().getName(), controllerService);
-
-    dbService_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(context->getControllerService(controllerService));
-    if (!dbService_)
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "'DB Controller Service' must be defined");
+  static std::vector<std::string> collectArguments(const std::shared_ptr<core::FlowFile>& flow_file);
 
-    static_cast<T*>(this)->processOnSchedule(*context);
-  }
+  virtual void processOnSchedule(core::ProcessContext& context) = 0;
+  virtual void processOnTrigger(core::ProcessContext& context, core::ProcessSession& session) = 0;
 
-  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override {
-    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
-    if (!lock.owns_lock()) {
-      logger_->log_warn("'onTrigger' is called before previous 'onTrigger' call is finished.");
-      context->yield();
-      return;
-    }
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
 
-    try {
-      if (!connection_) {
-        connection_ = dbService_->getConnection();
-      }
-      static_cast<T*>(this)->processOnTrigger(*session);
-    } catch (std::exception& e) {
-      logger_->log_error("SQLProcessor: '%s'", e.what());
-      if (connection_) {
-        std::string exp;
-        if (!connection_->connected(exp)) {
-          logger_->log_error("SQLProcessor: Connection exception: %s", exp.c_str());
-          connection_.reset();
-        }
-      }
-      context->yield();
-    }
-  }
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
   void notifyStop() override {
     connection_.reset();
   }
 
  protected:
-   static const core::Property& dbControllerService() {
-     static const core::Property s_dbControllerService =
-       core::PropertyBuilder::createProperty("DB Controller Service")->
-       isRequired(true)->
-       withDescription("Database Controller Service.")->
-       supportsExpressionLanguage(true)->
-       build();
-     return s_dbControllerService;
-   }
-
    std::shared_ptr<logging::Logger> logger_;
-   std::shared_ptr<sql::controllers::DatabaseService> dbService_;
+   std::shared_ptr<sql::controllers::DatabaseService> db_service_;
    std::unique_ptr<sql::Connection> connection_;
-   std::mutex onTriggerMutex_;
+   std::mutex on_trigger_mutex_;
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace processors
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
 
diff --git a/extensions/sql/services/ODBCConnector.h b/extensions/sql/services/ODBCConnector.h
index 59b2643..64d511c 100644
--- a/extensions/sql/services/ODBCConnector.h
+++ b/extensions/sql/services/ODBCConnector.h
@@ -21,6 +21,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "core/controller/ControllerService.h"
 
+#include "utils/GeneralUtils.h"
 #include "DatabaseService.h"
 #include "core/Resource.h"
 #include "data/DatabaseConnectors.h"
@@ -41,13 +42,11 @@ namespace controllers {
 
 class ODBCConnection : public sql::Connection {
  public:
-  explicit ODBCConnection(const std::string& connectionString)
-    : connection_string_(connectionString) {
-      session_ = std::make_unique<soci::session>(getSessionParameters());
+  explicit ODBCConnection(std::string connectionString)
+    : connection_string_(std::move(connectionString)) {
+      session_ = utils::make_unique<soci::session>(getSessionParameters());
   }
 
-  virtual ~ODBCConnection() = default;
-
   bool connected(std::string& exception) const override {
     try {
       exception.clear();
@@ -55,18 +54,18 @@ class ODBCConnection : public sql::Connection {
       // 'select 1' works for: H2, MySQL, Microsoft SQL Server, PostgreSQL, SQLite. For Oracle 'SELECT 1 FROM DUAL' works.
       prepareStatement("select 1")->execute();
       return true;
-    } catch (std::exception& e) {
+    } catch (const std::exception& e) {
       exception = e.what();
       return false;
     }
   }
 
   std::unique_ptr<sql::Statement> prepareStatement(const std::string& query) const override {
-    return std::make_unique<sql::Statement>(*session_, query);
+    return utils::make_unique<sql::Statement>(*session_, query);
   }
 
   std::unique_ptr<Session> getSession() const override {
-    return std::make_unique<sql::Session>(*session_);
+    return utils::make_unique<sql::Session>(*session_);
   }
 
  private:
diff --git a/extensions/sqlite/CMakeLists.txt b/extensions/sqlite/CMakeLists.txt
deleted file mode 100644
index 53d6370..0000000
--- a/extensions/sqlite/CMakeLists.txt
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
-
-file(GLOB SOURCES "*.cpp")
-
-add_library(minifi-sqlite-extensions STATIC ${SOURCES})
-set_property(TARGET minifi-sqlite-extensions PROPERTY POSITION_INDEPENDENT_CODE ON)
-
-target_link_libraries(minifi-sqlite-extensions ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-sqlite-extensions SQLite::SQLite3)
-
-
-SET (SQLITE-EXTENSIONS minifi-sqlite-extensions PARENT_SCOPE)
-
-register_extension(minifi-sqlite-extensions)
diff --git a/extensions/sqlite/ExecuteSQL.cpp b/extensions/sqlite/ExecuteSQL.cpp
deleted file mode 100644
index 710ce57..0000000
--- a/extensions/sqlite/ExecuteSQL.cpp
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ExecuteSQL.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property ExecuteSQL::ConnectionURL(  // NOLINT
-    "Connection URL",
-    "The database URL to connect to",
-    "");
-core::Property ExecuteSQL::SQLStatement(  // NOLINT
-    "SQL Statement",
-    "The SQL statement to execute",
-    "");
-
-core::Relationship ExecuteSQL::Success(  // NOLINT
-    "success",
-    "After a successful SQL execution, result FlowFiles are sent here");
-core::Relationship ExecuteSQL::Original(  // NOLINT
-    "original",
-    "The original FlowFile is sent here");
-core::Relationship ExecuteSQL::Failure(  // NOLINT
-    "failure",
-    "Failures which will not work if retried");
-
-void ExecuteSQL::initialize() {
-  std::set<core::Property> properties;
-  properties.insert(ConnectionURL);
-  properties.insert(SQLStatement);
-  setSupportedProperties(std::move(properties));
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Original);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-}
-
-void ExecuteSQL::onSchedule(core::ProcessContext *context,
-                            core::ProcessSessionFactory* /*sessionFactory*/) {
-  context->getProperty(ConnectionURL.getName(), db_url_);
-
-  if (db_url_.empty()) {
-    logger_->log_error("Invalid Connection URL");
-  }
-
-  context->getProperty(SQLStatement.getName(), sql_);
-}
-
-void ExecuteSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                           const std::shared_ptr<core::ProcessSession> &session) {
-  auto flow_file = session->get();
-
-  try {
-    // Use an existing context, if one is available
-    std::shared_ptr<minifi::sqlite::SQLiteConnection> db;
-
-    if (conn_q_.try_dequeue(db)) {
-      logger_->log_debug("Using available SQLite connection");
-    }
-
-    if (!db) {
-      logger_->log_info("Creating new SQLite connection");
-      if (db_url_.substr(0, 9) == "sqlite://") {
-        db = std::make_shared<minifi::sqlite::SQLiteConnection>(db_url_.substr(9));
-      } else {
-        std::stringstream err_msg;
-        err_msg << "Connection URL '" << db_url_ << "' is unsupported";
-        logger_->log_error(err_msg.str().c_str());
-        throw std::runtime_error("Connection Error");
-      }
-    }
-
-    auto dynamic_sql = std::make_shared<std::string>();
-
-    if (flow_file) {
-      if (sql_.empty()) {
-        // SQL is not defined as a property, so get SQL from the file content
-        SQLReadCallback cb(dynamic_sql);
-        session->read(flow_file, &cb);
-      } else {
-        // SQL is defined as a property, so get the property dynamically w/ EL support
-        context->getProperty(SQLStatement, *dynamic_sql, flow_file);
-      }
-    }
-
-    auto stmt = flow_file
-                ? db->prepare(*dynamic_sql)
-                : db->prepare(sql_);
-
-    if (flow_file) {
-      for (int i = 1; i < INT_MAX; i++) {
-        std::string val;
-        std::stringstream val_key;
-        val_key << "sql.args." << i << ".value";
-
-        if (!flow_file->getAttribute(val_key.str(), val)) {
-          break;
-        }
-
-        stmt.bind_text(i, val);
-      }
-    }
-
-    stmt.step();
-
-    if (!stmt.is_ok()) {
-      logger_->log_error("SQL statement execution failed: %s", db->errormsg());
-      session->transfer(flow_file, Failure);
-    }
-
-    auto num_cols = stmt.column_count();
-    std::vector<std::string> col_names;
-
-    for (int i = 0; i < num_cols; i++) {
-      col_names.emplace_back(stmt.column_name(i));
-    }
-
-    while (stmt.is_ok() && !stmt.is_done()) {
-      auto result_ff = session->create();
-
-      for (int i = 0; i < num_cols; i++) {
-        result_ff->addAttribute(col_names[i], stmt.column_text(i));
-      }
-
-      session->transfer(result_ff, Success);
-      stmt.step();
-    }
-
-    if (flow_file) {
-      session->transfer(flow_file, Original);
-    }
-
-    // Make connection available for use again
-    if (conn_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing SQLite connection");
-      conn_q_.enqueue(db);
-    } else {
-      logger_->log_info("Destroying SQLite connection because it is no longer needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-    if (flow_file) {
-      session->transfer(flow_file, Failure);
-    }
-    this->yield();
-  } catch (...) {
-    logger_->log_error("Caught Exception");
-    if (flow_file) {
-      session->transfer(flow_file, Failure);
-    }
-    this->yield();
-  }
-}
-
-int64_t ExecuteSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  sql_->resize(stream->size());
-  auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
-                                                          static_cast<int>(stream->size())));
-
-  if (num_read != stream->size()) {
-    throw std::runtime_error("SQLReadCallback failed to fully read flow file input stream");
-  }
-
-  return num_read;
-}
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/sqlite/ExecuteSQL.h b/extensions/sqlite/ExecuteSQL.h
deleted file mode 100644
index 04e59b1..0000000
--- a/extensions/sqlite/ExecuteSQL.h
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef NIFI_MINIFI_CPP_EXECUTESQL_H
-#define NIFI_MINIFI_CPP_EXECUTESQL_H
-
-#include <core/Resource.h>
-#include <core/Processor.h>
-
-#include <concurrentqueue.h>
-
-#include "SQLiteConnection.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-class ExecuteSQL : public core::Processor {
- public:
-  explicit ExecuteSQL(const std::string &name, utils::Identifier uuid = utils::Identifier())
-      : Processor(name, uuid),
-        logger_(logging::LoggerFactory<ExecuteSQL>::getLogger()) {
-  }
-
-  static core::Property ConnectionURL;
-  static core::Property SQLStatement;
-
-  static core::Relationship Success;
-  static core::Relationship Original;
-  static core::Relationship Failure;
-
-  void initialize() override;
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
-  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
-    logger_->log_error("onTrigger invocation with raw pointers is not implemented");
-  }
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                 const std::shared_ptr<core::ProcessSession> &session) override;
-
-  class SQLReadCallback : public InputStreamCallback {
-   public:
-    explicit SQLReadCallback(std::shared_ptr<std::string> sql)
-        : sql_(std::move(sql)) {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<std::string> sql_;
-  };
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  moodycamel::ConcurrentQueue<std::shared_ptr<minifi::sqlite::SQLiteConnection>> conn_q_;
-
-  std::string db_url_;
-  std::string sql_;
-};
-
-REGISTER_RESOURCE(ExecuteSQL, "Execute provided SQL query. Query result rows will be outputted as new flow files with attribute keys equal to "
-    "result column names and values equal to result values. There will be one output FlowFile per result row. This processor can be scheduled to "
-    "run using the standard timer-based scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, "
-    "then attributes of that FlowFile will be available when evaluating the query."); // NOLINT
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_EXECUTESQL_H
diff --git a/extensions/sqlite/PutSQL.cpp b/extensions/sqlite/PutSQL.cpp
deleted file mode 100644
index 0ad0082..0000000
--- a/extensions/sqlite/PutSQL.cpp
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "PutSQL.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-core::Property PutSQL::ConnectionURL(  // NOLINT
-    "Connection URL",
-    "The database URL to connect to",
-    "");
-core::Property PutSQL::SQLStatement(  // NOLINT
-    "SQL Statement",
-    "The SQL statement to execute",
-    "");
-core::Property PutSQL::BatchSize(  // NOLINT
-    "Batch Size",
-    "The maximum number of flow files to process in one batch",
-    "1");
-
-core::Relationship PutSQL::Success(  // NOLINT
-    "success",
-    "After a successful put SQL operation, FlowFiles are sent here");
-core::Relationship PutSQL::Retry(  // NOLINT
-    "retry",
-    "Failures which might work if retried");
-core::Relationship PutSQL::Failure(  // NOLINT
-    "failure",
-    "Failures which will not work if retried");
-
-void PutSQL::initialize() {
-  std::set<core::Property> properties;
-  properties.insert(ConnectionURL);
-  properties.insert(BatchSize);
-  properties.insert(SQLStatement);
-  setSupportedProperties(std::move(properties));
-
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Retry);
-  relationships.insert(Failure);
-  setSupportedRelationships(std::move(relationships));
-}
-
-void PutSQL::onSchedule(core::ProcessContext *context,
-                        core::ProcessSessionFactory* /*sessionFactory*/) {
-  context->getProperty(ConnectionURL.getName(), db_url_);
-
-  if (db_url_.empty()) {
-    logger_->log_error("Invalid Connection URL");
-  }
-
-  std::string batch_size;
-  context->getProperty(BatchSize.getName(), batch_size);
-
-  if (batch_size.empty()) {
-    batch_size_ = 100;
-  } else {
-    batch_size_ = std::stoull(batch_size);
-  }
-
-  context->getProperty(SQLStatement.getName(), sql_);
-}
-
-void PutSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                       const std::shared_ptr<core::ProcessSession> &session) {
-  auto flow_file = session->get();
-
-  if (!flow_file) {
-    return;
-  }
-
-  uint64_t batch_processed = 1;
-
-  try {
-    // Use an existing context, if one is available
-    std::shared_ptr<minifi::sqlite::SQLiteConnection> db;
-
-    if (conn_q_.try_dequeue(db)) {
-      logger_->log_debug("Using available SQLite connection");
-    }
-
-    if (!db) {
-      logger_->log_info("Creating new SQLite connection");
-      if (db_url_.substr(0, 9) == "sqlite://") {
-        db = std::make_shared<minifi::sqlite::SQLiteConnection>(db_url_.substr(9));
-      } else {
-        std::stringstream err_msg;
-        err_msg << "Connection URL '" << db_url_ << "' is unsupported";
-        logger_->log_error(err_msg.str().c_str());
-        throw std::runtime_error("Connection Error");
-      }
-    }
-
-    do {
-      auto sql = std::make_shared<std::string>();
-
-      if (sql_.empty()) {
-        // SQL is not defined as a property, so get SQL from the file content
-        SQLReadCallback cb(sql);
-        session->read(flow_file, &cb);
-      } else {
-        // SQL is defined as a property, so get the property dynamically w/ EL support
-        context->getProperty(SQLStatement, *sql, flow_file);
-      }
-
-      auto stmt = db->prepare(*sql);
-      for (uint64_t i = 1; i < UINT64_MAX; i++) {
-        std::string val;
-        std::stringstream val_key;
-        val_key << "sql.args." << i << ".value";
-
-        if (!flow_file->getAttribute(val_key.str(), val)) {
-          break;
-        }
-
-        stmt.bind_text(i, val);
-      }
-
-      stmt.step();
-
-      if (!stmt.is_ok()) {
-        logger_->log_error("SQL statement execution failed: %s", db->errormsg());
-        session->transfer(flow_file, Failure);
-      }
-
-      session->transfer(flow_file, Success);
-
-      flow_file = session->get();
-
-      if (!flow_file) {
-        logger_->log_info("Processed %d in batch", batch_processed);
-        break;
-      }
-
-      batch_processed++;
-    } while (batch_processed < batch_size_);
-
-    // Make connection available for use again
-    if (conn_q_.size_approx() < getMaxConcurrentTasks()) {
-      logger_->log_debug("Releasing SQLite connection");
-      conn_q_.enqueue(db);
-    } else {
-      logger_->log_info("Destroying SQLite connection because it is no longer needed");
-    }
-  } catch (std::exception &exception) {
-    logger_->log_error("Caught Exception %s", exception.what());
-    session->transfer(flow_file, Failure);
-    this->yield();
-  } catch (...) {
-    logger_->log_error("Caught Exception");
-    session->transfer(flow_file, Failure);
-    this->yield();
-  }
-}
-
-int64_t PutSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  sql_->resize(stream->size());
-  auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
-                                                          static_cast<int>(stream->size())));
-
-  if (num_read != stream->size()) {
-    throw std::runtime_error("SQLReadCallback failed to fully read flow file input stream");
-  }
-
-  return num_read;
-}
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/sqlite/PutSQL.h b/extensions/sqlite/PutSQL.h
deleted file mode 100644
index f4992e4..0000000
--- a/extensions/sqlite/PutSQL.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef NIFI_MINIFI_CPP_PUTSQL_H
-#define NIFI_MINIFI_CPP_PUTSQL_H
-
-#include <core/Resource.h>
-#include <core/Processor.h>
-
-#include <concurrentqueue.h>
-
-#include "SQLiteConnection.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-
-class PutSQL : public core::Processor {
- public:
-  explicit PutSQL(const std::string &name, utils::Identifier uuid = utils::Identifier())
-      : Processor(name, uuid),
-        logger_(logging::LoggerFactory<PutSQL>::getLogger()),
-        batch_size_(100) {
-  }
-
-  static core::Property ConnectionURL;
-  static core::Property SQLStatement;
-  static core::Property BatchSize;
-
-  static core::Relationship Success;
-  static core::Relationship Retry;
-  static core::Relationship Failure;
-
-  void initialize() override;
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
-  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
-    logger_->log_error("onTrigger invocation with raw pointers is not implemented");
-  }
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                 const std::shared_ptr<core::ProcessSession> &session) override;
-
-  class SQLReadCallback : public InputStreamCallback {
-   public:
-    explicit SQLReadCallback(std::shared_ptr<std::string> sql)
-        : sql_(std::move(sql)) {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<std::string> sql_;
-  };
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  moodycamel::ConcurrentQueue<std::shared_ptr<minifi::sqlite::SQLiteConnection>> conn_q_;
-
-  uint64_t batch_size_;
-  std::string db_url_;
-  std::string sql_;
-};
-
-REGISTER_RESOURCE(PutSQL, "Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command to execute. "
-    "The SQL command may use the ? character to bind parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming "
-    "convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The content of the FlowFile is expected to be in UTF-8 format."); // NOLINT
-
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_PUTSQL_H
diff --git a/extensions/sqlite/SQLiteConnection.h b/extensions/sqlite/SQLiteConnection.h
deleted file mode 100644
index 266d729..0000000
--- a/extensions/sqlite/SQLiteConnection.h
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef NIFI_MINIFI_CPP_SQLITECONNECTION_H
-#define NIFI_MINIFI_CPP_SQLITECONNECTION_H
-
-#include <sqlite3.h>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sqlite {
-
-class SQLiteConnection;
-
-/**
- * RAII wrapper for a sqlite3 prepared statement
- */
-class SQLiteStatement {
- public:
-  SQLiteStatement(sqlite3 *db, const std::string &sql)
-      : logger_(logging::LoggerFactory<SQLiteConnection>::getLogger()) {
-    if (sqlite3_prepare_v3(db, sql.c_str(), sql.size(), 0, &stmt_, nullptr)) {
-      std::stringstream err_msg;
-      err_msg << "Failed to create prepared statement: ";
-      err_msg << sql;
-      err_msg << " because ";
-      err_msg << sqlite3_errmsg(db);
-      throw std::runtime_error(err_msg.str());
-    }
-
-    if (!stmt_) {
-      std::stringstream err_msg;
-      err_msg << "Failed to create prepared statement: ";
-      err_msg << sql;
-      err_msg << " because statement was NULL";
-      throw std::runtime_error(err_msg.str());
-    }
-
-    db_ = db;
-  }
-
-  ~SQLiteStatement() {
-    sqlite3_finalize(stmt_);
-  }
-
-  void bind_text(int pos, const std::string &text) {
-    if (sqlite3_bind_text(stmt_, pos, text.c_str(), text.size(), SQLITE_TRANSIENT)) {
-      std::stringstream err_msg;
-      err_msg << "Failed to bind text parameter"
-              << pos
-              << ": "
-              << text
-              << " because "
-              << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  void bind_int64(int pos, uint64_t val) {
-    if (sqlite3_bind_int64(stmt_, pos, val)) {
-      std::stringstream err_msg;
-      err_msg << "Failed to bind int64 parameter"
-              << pos
-              << ": "
-              << val
-              << " because "
-              << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  void bind_double(int pos, double val) {
-    if (sqlite3_bind_double(stmt_, pos, val)) {
-      std::stringstream err_msg;
-      err_msg << "Failed to bind double parameter"
-              << pos
-              << ": "
-              << val
-              << " because "
-              << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  void bind_null(int pos) {
-    if (sqlite3_bind_null(stmt_, pos)) {
-      std::stringstream err_msg;
-      err_msg << "Failed to bind NULL parameter"
-              << pos
-              << " because "
-              << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  void step() {
-    int rc = sqlite3_step(stmt_);
-    if (rc == SQLITE_BUSY) {
-      reset_flags();
-      is_ok_ = false;
-      is_busy_ = true;
-    } else if (rc == SQLITE_DONE) {
-      reset_flags();
-      is_done_ = true;
-    } else if (rc == SQLITE_ROW) {
-      reset_flags();
-      is_row_ = true;
-    } else {
-      is_ok_ = false;
-      is_error_ = true;
-      std::stringstream err_msg;
-      err_msg << "Failed to step statement because "
-              << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  bool is_ok() {
-    return is_ok_;
-  }
-
-  bool is_done() {
-    return is_done_;
-  }
-
-  bool is_row() {
-    return is_row_;
-  }
-
-  bool is_error() {
-    return is_error_;
-  }
-
-  bool is_busy() {
-    return is_busy_;
-  }
-
-  std::string column_text(int col) {
-    return std::string(reinterpret_cast<const char *>(sqlite3_column_text(stmt_, col)));
-  }
-
-  uint64_t  column_int64(int col) {
-    return sqlite3_column_int64(stmt_, col);
-  }
-
-  double column_double(int col) {
-    return sqlite3_column_double(stmt_, col);
-  }
-
-  bool column_is_int(int col) {
-    return SQLITE_INTEGER == sqlite3_column_type(stmt_, col);
-  }
-
-  bool column_is_float(int col) {
-    return SQLITE_FLOAT == sqlite3_column_type(stmt_, col);
-  }
-
-  bool column_is_text(int col) {
-    return SQLITE_TEXT == sqlite3_column_type(stmt_, col);
-  }
-
-  bool column_is_blob(int col) {
-    return SQLITE_BLOB == sqlite3_column_type(stmt_, col);
-  }
-
-  bool column_is_null(int col) {
-    return SQLITE_NULL == sqlite3_column_type(stmt_, col);
-  }
-
-  std::string column_name(int col) {
-    return std::string(sqlite3_column_name(stmt_, col));
-  }
-
-  int column_count() {
-    return sqlite3_column_count(stmt_);
-  }
-
-  void reset() {
-    sqlite3_reset(stmt_);
-  }
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-
-  sqlite3_stmt *stmt_;
-  sqlite3 *db_ = nullptr;
-  bool is_ok_ = true;
-  bool is_busy_ = false;
-  bool is_done_ = false;
-  bool is_error_ = false;
-  bool is_row_ = false;
-
-  void reset_flags() {
-    is_ok_ = true;
-    is_busy_ = false;
-    is_done_ = false;
-    is_error_ = false;
-    is_row_ = false;
-  }
-};
-
-/**
- * RAII wrapper for a sqlite3 connection
- */
-class SQLiteConnection {
- public:
-  SQLiteConnection(const std::string &filename)
-      : logger_(logging::LoggerFactory<SQLiteConnection>::getLogger()),
-        filename_(filename) {
-    logger_->log_info("Opening SQLite database: %s", filename_);
-
-    if (sqlite3_open(filename_.c_str(), &db_)) {
-      std::stringstream err_msg("Failed to open database: ");
-      err_msg << filename_;
-      err_msg << " because ";
-      err_msg << sqlite3_errmsg(db_);
-      throw std::runtime_error(err_msg.str());
-    }
-  }
-
-  SQLiteConnection(SQLiteConnection &&other)
-      : logger_(std::move(other.logger_)),
-        filename_(std::move(other.filename_)),
-        db_(other.db_) {
-    other.db_ = nullptr;
-  }
-
-  ~SQLiteConnection() {
-    logger_->log_info("Closing SQLite database: %s", filename_);
-    sqlite3_close(db_);
-  }
-
-  SQLiteStatement prepare(const std::string sql) {
-    return SQLiteStatement(db_, sql);
-  }
-
-  std::string errormsg() {
-    return sqlite3_errmsg(db_);
-  }
-
- private:
-  std::shared_ptr<logging::Logger> logger_;
-  std::string filename_;
-
-  sqlite3 *db_ = nullptr;
-};
-
-} /* namespace sqlite */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif  // NIFI_MINIFI_CPP_SQLITECONNECTION_H
diff --git a/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp b/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp
index 57eb4bf..5fb73a0 100644
--- a/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp
+++ b/extensions/windows-event-log/tests/CWELCustomProviderTests.cpp
@@ -30,6 +30,7 @@
 #include "IntegrationTestUtils.h"
 
 #include "CWELTestUtils.h"
+#include "Utils.h"
 
 // generated from the manifest file "custom-provider/unit-test-provider.man"
 // using the command "mc -um unit-test-provider.man"
diff --git a/extensions/windows-event-log/tests/CWELTestUtils.h b/extensions/windows-event-log/tests/CWELTestUtils.h
index 3820ca5..fdccd5c 100644
--- a/extensions/windows-event-log/tests/CWELTestUtils.h
+++ b/extensions/windows-event-log/tests/CWELTestUtils.h
@@ -87,31 +87,3 @@ class OutputFormatTestController : public TestController {
   std::string output_format_;
   utils::optional<std::string> json_format_;
 };
-
-// carries out a loose match on objects, i.e. it doesn't matter if the
-// actual object has extra fields than expected
-void matchJSON(const rapidjson::Value& json, const rapidjson::Value& expected) {
-  if (expected.IsObject()) {
-    REQUIRE(json.IsObject());
-    for (const auto& expected_member : expected.GetObject()) {
-      REQUIRE(json.HasMember(expected_member.name));
-      matchJSON(json[expected_member.name], expected_member.value);
-    }
-  } else if (expected.IsArray()) {
-    REQUIRE(json.IsArray());
-    REQUIRE(json.Size() == expected.Size());
-    for (size_t idx{0}; idx < expected.Size(); ++idx) {
-      matchJSON(json[idx], expected[idx]);
-    }
-  } else {
-    REQUIRE(json == expected);
-  }
-}
-
-void verifyJSON(const std::string& json_str, const std::string& expected_str) {
-  rapidjson::Document json, expected;
-  REQUIRE_FALSE(json.Parse(json_str.c_str()).HasParseError());
-  REQUIRE_FALSE(expected.Parse(expected_str.c_str()).HasParseError());
-
-  matchJSON(json, expected);
-}
diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
index 165678d..16e22c0 100644
--- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
+++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
@@ -26,6 +26,7 @@
 #include "rapidjson/document.h"
 
 #include "CWELTestUtils.h"
+#include "Utils.h"
 
 using ConsumeWindowsEventLog = org::apache::nifi::minifi::processors::ConsumeWindowsEventLog;
 using LogAttribute = org::apache::nifi::minifi::processors::LogAttribute;
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 492dc7b..cd23e4f 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -84,6 +84,8 @@ class StringUtils {
 
   static utils::optional<bool> toBool(const std::string& input);
 
+  static std::string toLower(std::string str);
+
   // Trim String utils
 
   /**
@@ -298,7 +300,6 @@ class StringUtils {
     return join(std::basic_string<TChar>(separator), container);
   }
 
-
   /**
    * Concatenates string representation of integrals stored in an arbitrary container using the provided separator.
    * @tparam TChar char type of the string (char or wchar_t)
diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp
index c9c8c2f..d6d6efe 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -44,6 +44,11 @@ utils::optional<bool> StringUtils::toBool(const std::string& str) {
   return {};
 }
 
+std::string StringUtils::toLower(std::string str) {
+  std::transform(str.begin(), str.end(), str.begin(), [] (unsigned char c) {return std::tolower(c);});
+  return str;
+}
+
 std::string StringUtils::trim(const std::string& s) {
   return trimRight(trimLeft(s));
 }
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/test/Path.h
similarity index 53%
rename from extensions/sql/data/WriteCallback.h
rename to libminifi/test/Path.h
index 5336cd0..96a7c74 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/test/Path.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,32 +17,50 @@
 
 #pragma once
 
-#include <string>
-
-#include "FlowFileRecord.h"
+#include "utils/file/FileUtils.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
+namespace utils {
+
+class Path {
+ public:
+  Path() = default;
 
-class WriteCallback : public OutputStreamCallback {
-public:
-  explicit WriteCallback(const std::string& data)
-    : data_(data) {
+  Path(std::string value): value_(std::move(value)) {}
+
+  Path& operator=(const std::string& new_value) {
+    value_ = new_value;
+    return *this;
   }
 
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-    if (data_.empty())
-      return 0;
+  template<typename T>
+  Path& operator/=(T&& suffix) {
+    value_ = file::concat_path(value_, std::string{std::forward<T>(suffix)});
+    return *this;
+  }
+
+  template<typename T>
+  Path operator/(T&& suffix) {
+    return Path{file::concat_path(value_, std::string{std::forward<T>(suffix)})};
+  }
+
+  std::string str() const {
+    return value_;
+  }
 
-    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
+  explicit operator std::string() const {
+    return value_;
   }
 
- const std::string& data_;
+ private:
+  std::string value_;
 };
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index a5c1abb..94b7633 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -149,19 +149,28 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
 
 std::shared_ptr<minifi::Connection> TestPlan::addConnection(const std::shared_ptr<core::Processor>& source_proc, const core::Relationship& source_relationship, const std::shared_ptr<core::Processor>& destination_proc) {
   std::stringstream connection_name;
-  connection_name << source_proc->getUUIDStr() << "-to-" << destination_proc->getUUIDStr();
+  connection_name
+    << (source_proc ? source_proc->getUUIDStr().c_str() : "none")
+    << "-to-"
+    << (destination_proc ? destination_proc->getUUIDStr().c_str() : "none");
   std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str());
 
   connection->addRelationship(source_relationship);
 
   // link the connections so that we can test results at the end for this
-  connection->setSource(source_proc);
-  connection->setDestination(destination_proc);
 
-  connection->setSourceUUID(source_proc->getUUID());
-  connection->setDestinationUUID(destination_proc->getUUID());
-  source_proc->addConnection(connection);
-  if (source_proc != destination_proc) {
+  if (source_proc) {
+    connection->setSource(source_proc);
+    connection->setSourceUUID(source_proc->getUUID());
+  }
+  if (destination_proc) {
+    connection->setDestination(destination_proc);
+    connection->setDestinationUUID(destination_proc->getUUID());
+  }
+  if (source_proc) {
+    source_proc->addConnection(connection);
+  }
+  if (source_proc != destination_proc && destination_proc) {
     destination_proc->addConnection(connection);
   }
   relationships_.push_back(connection);
@@ -280,12 +289,12 @@ void TestPlan::scheduleProcessors() {
   }
 }
 
-bool TestPlan::runProcessor(const std::shared_ptr<core::Processor>& processor, std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+bool TestPlan::runProcessor(const std::shared_ptr<core::Processor>& processor, const PreTriggerVerifier& verify) {
   const std::size_t processor_location = std::distance(processor_queue_.begin(), getProcessorItByUuid(processor->getUUIDStr()));
   return runProcessor(gsl::narrow<int>(processor_location), verify);
 }
 
-bool TestPlan::runProcessor(int target_location, std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& verify) {
   if (!finalized) {
     finalize();
   }
@@ -310,13 +319,13 @@ bool TestPlan::runProcessor(int target_location, std::function<void(const std::s
   return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();
 }
 
-bool TestPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+bool TestPlan::runNextProcessor(const PreTriggerVerifier& verify) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   ++location;
   return runProcessor(location, verify);
 }
 
-bool TestPlan::runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> /*verify*/) {
+bool TestPlan::runCurrentProcessor(const PreTriggerVerifier& /*verify*/) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   return runProcessor(location);
 }
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index fd9bd36..6a9d510 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -50,6 +50,7 @@
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "core/state/nodes/FlowInformation.h"
 #include "utils/ClassUtils.h"
+#include "Path.h"
 
 class LogTestController {
  public:
@@ -251,6 +252,8 @@ class TestPlan {
   void increment_location() { ++location; }
   void reset_location() { location = -1; }
 
+  using PreTriggerVerifier = std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)>;
+
   std::vector<std::shared_ptr<core::Processor>>::iterator getProcessorItByUuid(const std::string& uuid);
   std::shared_ptr<core::ProcessContext> getProcessContextForProcessor(const std::shared_ptr<core::Processor>& processor);
 
@@ -260,10 +263,10 @@ class TestPlan {
 
   // Note: all this verify logic is only used in TensorFlow tests as a replacement for UpdateAttribute
   // It should probably not be the part of the standard way of running processors
-  bool runProcessor(const std::shared_ptr<core::Processor>& processor, std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
-  bool runProcessor(int target_location, std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
-  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
-  bool runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+  bool runProcessor(const std::shared_ptr<core::Processor>& processor, const PreTriggerVerifier& verify = nullptr);
+  bool runProcessor(size_t target_location, const PreTriggerVerifier& verify = nullptr);
+  bool runNextProcessor(const PreTriggerVerifier& verify = nullptr);
+  bool runCurrentProcessor(const PreTriggerVerifier& verify = nullptr);
   bool runCurrentProcessorUntilFlowfileIsProduced(const std::chrono::seconds& wait_duration);
 
   std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> getProvenanceRecords();
@@ -279,6 +282,10 @@ class TestPlan {
     return flow_repo_;
   }
 
+  std::shared_ptr<core::ContentRepository> getContentRepo() {
+    return content_repo_;
+  }
+
   std::shared_ptr<core::Repository> getProvenanceRepo() {
     return prov_repo_;
   }
@@ -295,6 +302,14 @@ class TestPlan {
     return state_manager_provider_;
   }
 
+  std::string getContent(const std::shared_ptr<core::FlowFile>& file) {
+    auto content_claim = file->getResourceClaim();
+    auto content_stream = content_repo_->read(*content_claim.get());
+    auto output_stream = std::make_shared<minifi::io::BufferStream>();
+    minifi::InputStreamPipe{output_stream}.process(content_stream);
+    return {reinterpret_cast<const char*>(output_stream->getBuffer()), output_stream->size()};
+  }
+
   void finalize();
 
  protected:
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 228a135..64560da 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <string>
+#include "rapidjson/document.h"
 
 #define FIELD_ACCESSOR(field) \
   template<typename T> \
@@ -29,3 +30,36 @@
   static auto call_##method(T&& instance, Args&& ...args) -> decltype((std::forward<T>(instance).method(std::forward<Args>(args)...))) { \
     return std::forward<T>(instance).method(std::forward<Args>(args)...); \
   }
+
+// carries out a loose match on objects, i.e. it doesn't matter if the
+// actual object has extra fields than expected
+void matchJSON(const rapidjson::Value& actual, const rapidjson::Value& expected) {
+  if (expected.IsObject()) {
+    REQUIRE(actual.IsObject());
+    for (const auto& expected_member : expected.GetObject()) {
+      REQUIRE(actual.HasMember(expected_member.name));
+      matchJSON(actual[expected_member.name], expected_member.value);
+    }
+  } else if (expected.IsArray()) {
+    REQUIRE(actual.IsArray());
+    REQUIRE(actual.Size() == expected.Size());
+    for (size_t idx{0}; idx < expected.Size(); ++idx) {
+      matchJSON(actual[idx], expected[idx]);
+    }
+  } else {
+    REQUIRE(actual == expected);
+  }
+}
+
+void verifyJSON(const std::string& actual_str, const std::string& expected_str, bool strict = false) {
+  rapidjson::Document actual, expected;
+  REQUIRE_FALSE(actual.Parse(actual_str.c_str()).HasParseError());
+  REQUIRE_FALSE(expected.Parse(expected_str.c_str()).HasParseError());
+
+  if (strict) {
+    REQUIRE(actual == expected);
+  } else {
+    matchJSON(actual, expected);
+  }
+
+}
diff --git a/libminifi/test/sql-tests/CMakeLists.txt b/libminifi/test/sql-tests/CMakeLists.txt
new file mode 100644
index 0000000..ae3d1ee
--- /dev/null
+++ b/libminifi/test/sql-tests/CMakeLists.txt
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+file(GLOB SQL_TESTS  "*.cpp")
+
+set(SQL_TEST_COUNT 0)
+foreach(testfile ${SQL_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable(${testfilename} "${testfile}")
+
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/sql")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
+
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    target_wholearchive_library(${testfilename} minifi-sql)
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    target_wholearchive_library(${testfilename} minifi-expression-language-extensions)
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+
+    math(EXPR SQL_TEST_COUNT "${SQL_TEST_COUNT}+1")
+endforeach()
+
+message("-- Finished building ${SQL_TEST_COUNT} sql test file(s)...")
diff --git a/libminifi/test/sql-tests/ExecuteSQLTests.cpp b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
new file mode 100644
index 0000000..0d693c7
--- /dev/null
+++ b/libminifi/test/sql-tests/ExecuteSQLTests.cpp
@@ -0,0 +1,196 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "SQLTestController.h"
+#include "processors/ExecuteSQL.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("ExecuteSQL works without incoming flow file", "[ExecuteSQL1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in property", "[ExecuteSQL2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty("SQL select query", "SELECT * FROM test_table WHERE int_col = ${int_col_value}");
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({{"int_col_value", "11"}});
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses statement in content", "[ExecuteSQL3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "one"}, {22, "two"}});
+
+  auto input_file = plan->addInput({}, "SELECT * FROM test_table ORDER BY int_col ASC");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "one"
+    },{
+      "int_col": 22,
+      "text_col": "two"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL uses sql.args.N.value attributes", "[ExecuteSQL4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+
+  controller.insertValues({{11, "apple"}, {11, "banana"}, {22, "banana"}});
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "11"},
+    {"sql.args.2.value", "banana"}
+  }, "SELECT * FROM test_table WHERE int_col = ? AND text_col = ?");
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::ExecuteSQL::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "1");
+
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{
+      "int_col": 11,
+      "text_col": "banana"
+    }]
+  )");
+}
+
+TEST_CASE("ExecuteSQL honors Max Rows Per Flow File", "[ExecuteSQL5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::ExecuteSQL::MaxRowsPerFlowFile.getName(), "2");
+  sql_proc->setProperty(processors::ExecuteSQL::SQLSelectQuery.getName(), "SELECT text_col FROM test_table ORDER BY int_col ASC");
+
+  controller.insertValues({
+    {101, "apple"},
+    {102, "banana"},
+    {103, "pear"},
+    {104, "strawberry"},
+    {105, "pineapple"}
+  });
+
+  auto input_file = plan->addInput();
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected);
+  };
+
+  FlowFileMatcher matcher{content_verifier, {
+      processors::ExecuteSQL::RESULT_ROW_COUNT,
+      processors::ExecuteSQL::FRAGMENT_COUNT,
+      processors::ExecuteSQL::FRAGMENT_INDEX,
+      processors::ExecuteSQL::FRAGMENT_IDENTIFIER
+  }};
+
+  utils::optional<std::string> fragment_id;
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 3);
+  matcher.verify(flow_files[0],
+    {"2", "3", "0", capture(fragment_id)},
+    R"([{"text_col": "apple"}, {"text_col": "banana"}])");
+  REQUIRE(fragment_id);
+  matcher.verify(flow_files[1],
+    {"2", "3", "1", *fragment_id},
+    R"([{"text_col": "pear"}, {"text_col": "strawberry"}])");
+  matcher.verify(flow_files[2],
+    {"1", "3", "2", *fragment_id},
+    R"([{"text_col": "pineapple"}])");
+}
+
+TEST_CASE("ExecuteSQL incoming flow file is malformed", "[ExecuteSQL6]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("ExecuteSQL", {{"success", "d"}, {"failure", "d"}});
+
+  auto input_file = plan->addInput({}, "not a valid sql statement");
+
+  REQUIRE_THROWS(plan->run());
+}
diff --git a/libminifi/test/sql-tests/FlowFileMatcher.h b/libminifi/test/sql-tests/FlowFileMatcher.h
new file mode 100644
index 0000000..96091ec
--- /dev/null
+++ b/libminifi/test/sql-tests/FlowFileMatcher.h
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+#include "core/FlowFile.h"
+#include <functional>
+
+struct AttributeValue {
+  AttributeValue(std::string value)
+    : value{std::move(value)} {}
+
+  AttributeValue(const char* value)
+      : value{value} {}
+
+  AttributeValue(std::string value, utils::optional<std::string>& capture)
+      : value{std::move(value)}, capture{&capture} {}
+
+  std::string value;
+  utils::optional<std::string>* capture{nullptr};
+};
+
+AttributeValue capture(utils::optional<std::string>& value) {
+  return {"", value};
+}
+
+using ContentMatcher = std::function<void(const std::shared_ptr<core::FlowFile>& actual, const std::string& expected)>;
+
+class FlowFileMatcher {
+ public:
+  FlowFileMatcher(ContentMatcher content_matcher, std::vector<std::string> attribute_names)
+    : content_matcher_{std::move(content_matcher)},
+      attribute_names_{std::move(attribute_names)} {}
+
+  void verify(const std::shared_ptr<core::FlowFile>& actual_file, const std::vector<AttributeValue>& expected_attributes, const std::string& expected_content) {
+    REQUIRE(expected_attributes.size() == attribute_names_.size());
+    for (size_t idx = 0; idx < attribute_names_.size(); ++idx) {
+      const std::string& attribute_name = attribute_names_[idx];
+      std::string actual_value;
+      REQUIRE(actual_file->getAttribute(attribute_name, actual_value));
+
+      const auto& expected_value = expected_attributes[idx];
+      if (expected_value.capture != nullptr) {
+        *expected_value.capture = actual_value;
+      } else {
+        // simple value
+        REQUIRE(expected_value.value == actual_value);
+      }
+    }
+
+    // verify content
+    content_matcher_(actual_file, expected_content);
+  }
+
+ private:
+  ContentMatcher content_matcher_;
+  std::vector<std::string> attribute_names_;
+};
diff --git a/libminifi/test/sql-tests/PutSQLTests.cpp b/libminifi/test/sql-tests/PutSQLTests.cpp
new file mode 100644
index 0000000..e5e19cb
--- /dev/null
+++ b/libminifi/test/sql-tests/PutSQLTests.cpp
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+
+#include "processors/PutSQL.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+
+TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") {
+  TestController testController;
+  std::shared_ptr<core::Processor>
+      processor = std::make_shared<org::apache::nifi::minifi::processors::PutSQL>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+}
+
+TEST_CASE("Test Put", "[PutSQLPut]") {
+  SQLTestController testController;
+
+  auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "42"},
+    {"sql.args.2.value", "asdf"}
+  });
+
+  sql_proc->setProperty(
+      "SQL Statement",
+      "INSERT INTO test_table (int_col, text_col) VALUES (?, ?)");
+
+  plan->run();
+
+  // Verify output state
+  auto rows = testController.fetchValues();
+  REQUIRE(rows.size() == 1);
+  REQUIRE(rows[0].int_col == 42);
+  REQUIRE(rows[0].text_col == "asdf");
+}
+
+TEST_CASE("Test Put Content", "[PutSQLPutContent]") {
+  SQLTestController testController;
+
+  auto plan = testController.createSQLPlan("PutSQL", {{"success", "d"}});
+  auto input_file = plan->addInput({
+    {"sql.args.1.value", "4242"},
+    {"sql.args.2.value", "fdsa"}
+  }, "INSERT INTO test_table VALUES(?, ?);");
+
+  plan->run();
+
+  // Verify output state
+  auto rows = testController.fetchValues();
+  REQUIRE(rows.size() == 1);
+  REQUIRE(rows[0].int_col == 4242);
+  REQUIRE(rows[0].text_col == "fdsa");
+}
+
diff --git a/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp b/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp
new file mode 100644
index 0000000..095d079
--- /dev/null
+++ b/libminifi/test/sql-tests/QueryDatabaseTableTests.cpp
@@ -0,0 +1,251 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include "../TestBase.h"
+#include "SQLTestController.h"
+#include "Utils.h"
+#include "FlowFileMatcher.h"
+
+TEST_CASE("QueryDatabaseTable queries the table and returns specified columns", "[QueryDatabaseTable1]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}]
+  )", true);
+}
+
+TEST_CASE("QueryDatabaseTable requerying the table returns only new rows", "[QueryDatabaseTable2]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"}
+  });
+
+  plan->run();
+
+  auto first_flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(first_flow_files.size() == 1);
+
+  controller.insertValues({
+    {104, "four"},
+    {105, "five"}
+  });
+
+  SECTION("Run onTrigger only") {plan->run();}
+  SECTION("Run both onSchedule and onTrigger") {plan->run(true);}
+
+  auto second_flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(second_flow_files.size() == 1);
+
+  std::string row_count;
+  second_flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+  auto content = plan->getContent(second_flow_files[0]);
+  verifyJSON(content, R"(
+    [{"text_col": "four"}, {"text_col": "five"}]
+  )", true);
+}
+
+TEST_CASE("QueryDatabaseTable specifying initial max values", "[QueryDatabaseTable3]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+  sql_proc->setDynamicProperty("initial.maxvalue.int_col", "102");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"},
+    {104, "four"}
+  });
+
+  plan->run();
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "2");
+  auto content = plan->getContent(flow_files[0]);
+  verifyJSON(content, R"(
+    [{"text_col": "three"}, {"text_col": "four"}]
+  )", true);
+}
+
+TEST_CASE("QueryDatabaseTable honors Max Rows Per Flow File and sets output attributes", "[QueryDatabaseTable4]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxRowsPerFlowFile.getName(), "3");
+
+  controller.insertValues({
+    {101, "one"},
+    {102, "two"},
+    {103, "three"},
+    {104, "four"},
+    {105, "five"}
+  });
+
+  plan->run();
+
+  auto content_verifier = [&] (const std::shared_ptr<core::FlowFile>& actual, const std::string& expected) {
+    verifyJSON(plan->getContent(actual), expected, true);
+  };
+
+  FlowFileMatcher matcher(content_verifier, {
+      processors::QueryDatabaseTable::RESULT_TABLE_NAME,
+      processors::QueryDatabaseTable::RESULT_ROW_COUNT,
+      processors::QueryDatabaseTable::FRAGMENT_COUNT,
+      processors::QueryDatabaseTable::FRAGMENT_INDEX,
+      processors::QueryDatabaseTable::FRAGMENT_IDENTIFIER,
+      "maxvalue.int_col"
+  });
+
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 2);
+
+  utils::optional<std::string> fragment_id;
+
+  matcher.verify(flow_files[0],
+    {"test_table", "3", "2", "0", capture(fragment_id), "105"},
+    R"([{"text_col": "one"}, {"text_col": "two"}, {"text_col": "three"}])");
+  REQUIRE(fragment_id);
+  matcher.verify(flow_files[1],
+    {"test_table", "2", "2", "1", *fragment_id, "105"},
+    R"([{"text_col": "four"}, {"text_col": "five"}])");
+}
+
+TEST_CASE("QueryDatabaseTable changing table name resets state", "[QueryDatabaseTable5]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+      {101, "one"},
+      {102, "two"},
+      {103, "three"}
+  });
+
+  // query "test_table"
+  plan->run();
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+
+
+  // query "empty_test_table"
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "empty_test_table");
+  plan->run(true);
+  flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 0);
+
+  // again query "test_table", by now the stored state is reset, so all rows are returned
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  plan->run(true);
+  flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+}
+
+TEST_CASE("QueryDatabaseTable changing maximum value columns resets state", "[QueryDatabaseTable6]") {
+  SQLTestController controller;
+
+  auto plan = controller.createSQLPlan("QueryDatabaseTable", {{"success", "d"}});
+  auto sql_proc = plan->getSQLProcessor();
+  sql_proc->setProperty(processors::QueryDatabaseTable::TableName.getName(), "test_table");
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  sql_proc->setProperty(processors::QueryDatabaseTable::ColumnNames.getName(), "text_col");
+
+  controller.insertValues({
+      {101, "one"},
+      {102, "two"},
+      {103, "three"}
+  });
+
+  // query using ["int_col"] as max value columns
+  plan->run();
+  auto flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  std::string row_count;
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+
+
+  // query using ["int_col", "text_col"] as max value columns
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col, text_col");
+  plan->run(true);
+  flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+
+  // query using ["int_col"] as max value columns again
+  sql_proc->setProperty(processors::QueryDatabaseTable::MaxValueColumnNames.getName(), "int_col");
+  plan->run(true);
+  flow_files = plan->getOutputs({"success", "d"});
+  REQUIRE(flow_files.size() == 1);
+  flow_files[0]->getAttribute(processors::QueryDatabaseTable::RESULT_ROW_COUNT, row_count);
+  REQUIRE(row_count == "3");
+}
diff --git a/libminifi/test/sql-tests/SQLTestController.h b/libminifi/test/sql-tests/SQLTestController.h
new file mode 100644
index 0000000..a3114bc
--- /dev/null
+++ b/libminifi/test/sql-tests/SQLTestController.h
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+#include "processors/PutSQL.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+#include "processors/ExecuteSQL.h"
+#include "processors/QueryDatabaseTable.h"
+#include "SQLTestPlan.h"
+
+#include "services/ODBCConnector.h"
+
+struct TableRow {
+  int64_t int_col;
+  std::string text_col;
+};
+
+#ifdef WIN32
+const std::string DRIVER = "{SQLite3 ODBC Driver}";
+#else
+const std::string DRIVER = "libsqlite3odbc.so";
+#endif
+
+class SQLTestController : public TestController {
+ public:
+  SQLTestController() {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+    LogTestController::getInstance().setTrace<processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<processors::PutSQL>();
+    LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
+    LogTestController::getInstance().setTrace<processors::QueryDatabaseTable>();
+
+    char format[] = "/var/tmp/gt.XXXXXX";
+    test_dir_ = createTempDirectory(format);
+    database_ = test_dir_ / "test.db";
+    connection_str_ = "Driver=" + DRIVER + ";Database=" + database_.str();
+
+    // Create test dbs
+    minifi::sql::controllers::ODBCConnection{connection_str_}.prepareStatement("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);")->execute();
+    minifi::sql::controllers::ODBCConnection{connection_str_}.prepareStatement("CREATE TABLE empty_test_table (int_col INTEGER, text_col TEXT);")->execute();
+  }
+
+  std::shared_ptr<SQLTestPlan> createSQLPlan(const std::string& sql_processor, std::initializer_list<core::Relationship> outputs) {
+    return std::make_shared<SQLTestPlan>(*this, connection_str_, sql_processor, outputs);
+  }
+
+  void insertValues(std::initializer_list<TableRow> values) {
+    minifi::sql::controllers::ODBCConnection connection{connection_str_};
+    for (const auto& value : values) {
+      connection.prepareStatement("INSERT INTO test_table (int_col, text_col) VALUES (?, ?);")
+          ->execute({std::to_string(value.int_col), value.text_col});
+    }
+  }
+
+  std::vector<TableRow> fetchValues() {
+    std::vector<TableRow> rows;
+    minifi::sql::controllers::ODBCConnection connection{connection_str_};
+    auto soci_rowset = connection.prepareStatement("SELECT * FROM test_table;")->execute();
+    for (const auto& soci_row : soci_rowset) {
+      rows.push_back(TableRow{get_column_cast<int64_t>(soci_row, "int_col"), soci_row.get<std::string>("text_col")});
+    }
+    return rows;
+  }
+
+  utils::Path getDB() const {
+    return database_;
+  }
+
+ private:
+  template<typename T>
+  T get_column_cast(const soci::row& row, const std::string& column_name) {
+    const auto& column_props = row.get_properties(column_name);
+    switch (const auto data_type = column_props.get_data_type()) {
+      case soci::data_type::dt_integer:
+        return gsl::narrow<T>(row.get<int>(column_name));
+      case soci::data_type::dt_long_long:
+        return gsl::narrow<T>(row.get<long long>(column_name));
+      case soci::data_type::dt_unsigned_long_long:
+        return gsl::narrow<T>(row.get<unsigned long long>(column_name));
+      default:
+        throw std::logic_error("Unknown data type for column \"" + column_name + "\"");
+    }
+  }
+
+  utils::Path test_dir_;
+  utils::Path database_;
+  std::string connection_str_;
+};
diff --git a/libminifi/test/sql-tests/SQLTestPlan.h b/libminifi/test/sql-tests/SQLTestPlan.h
new file mode 100644
index 0000000..80e430d
--- /dev/null
+++ b/libminifi/test/sql-tests/SQLTestPlan.h
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../TestBase.h"
+
+class SQLTestPlan {
+ public:
+  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
+    plan_ = controller.createPlan();
+    processor_ = plan_->addProcessor(sql_processor, sql_processor);
+    plan_->setProperty(processor_, "DB Controller Service", "ODBCService");
+    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
+    for (const auto& output_rel : output_rels) {
+      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
+    }
+
+    // initialize database service
+    auto service = plan_->addController("ODBCService", "ODBCService");
+    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString.getName(), connection_str);
+  }
+
+  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
+    return plan_->getContent(flow_file);
+  }
+
+  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const utils::optional<std::string>& content = {}) {
+    auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+    for (const auto& attr : attributes) {
+      flow_file->setAttribute(attr.first, attr.second);
+    }
+    if (content) {
+      auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo());
+      auto content_stream = plan_->getContentRepo()->write(*claim);
+      int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length());
+      REQUIRE(ret == content->length());
+      flow_file->setOffset(0);
+      flow_file->setSize(content->length());
+      flow_file->setResourceClaim(claim);
+    }
+    input_->put(flow_file);
+    return flow_file;
+  }
+
+  std::shared_ptr<core::Processor> getSQLProcessor() {
+    return processor_;
+  }
+
+  void run(bool reschedule = false) {
+    if (reschedule) {
+      plan_->reset(reschedule);
+    }
+    plan_->runProcessor(0);  // run the one and only sql processor
+  }
+
+  std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
+    auto conn = outputs_[relationship];
+    REQUIRE(conn);
+    std::vector<std::shared_ptr<core::FlowFile>> flow_files;
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    while (auto flow_file = conn->poll(expired)) {
+      REQUIRE(expired.empty());
+      flow_files.push_back(std::move(flow_file));
+    }
+    REQUIRE(expired.empty());
+    return flow_files;
+  }
+
+ private:
+  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<minifi::Connection> input_;
+  std::map<core::Relationship, std::shared_ptr<minifi::Connection>> outputs_;
+};
diff --git a/libminifi/test/sqlite-tests/CMakeLists.txt b/libminifi/test/sqlite-tests/CMakeLists.txt
deleted file mode 100644
index c31609c..0000000
--- a/libminifi/test/sqlite-tests/CMakeLists.txt
+++ /dev/null
@@ -1,39 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB SQLITE_INTEGRATION_TESTS "*.cpp")
-SET(SQLITE-EXTENSIONS_TEST_COUNT 0)
-FOREACH(testfile ${SQLITE_INTEGRATION_TESTS})
-  get_filename_component(testfilename "${testfile}" NAME_WE)
-  add_executable("${testfilename}" "${testfile}")
-  target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
-  target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors/processors")
-  target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/sqlite")
-  target_include_directories(${testfilename} SYSTEM PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/thirdparty/sqlite")
-
-  target_wholearchive_library(${testfilename} minifi-sqlite-extensions)
-  target_wholearchive_library(${testfilename} minifi-standard-processors)
-
-  createTests("${testfilename}")
-  target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
-#  target_link_libraries(minifi-sqlite-extensions sqlite)
-  MATH(EXPR SQLITE-EXTENSIONS_TEST_COUNT "${SQLITE-EXTENSIONS_TEST_COUNT}+1")
-  add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
-ENDFOREACH()
-message("-- Finished building ${SQLITE-EXTENSIONS_TEST_COUNT} SQLite related test file(s)...")
diff --git a/libminifi/test/sqlite-tests/SQLiteTests.cpp b/libminifi/test/sqlite-tests/SQLiteTests.cpp
deleted file mode 100644
index 6ba6eee..0000000
--- a/libminifi/test/sqlite-tests/SQLiteTests.cpp
+++ /dev/null
@@ -1,472 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <fstream>
-#include <map>
-#include <memory>
-#include <set>
-#include <iostream>
-#include <GenerateFlowFile.h>
-#include <UpdateAttribute.h>
-#include <LogAttribute.h>
-#include <ExecuteSQL.h>
-
-#include "../TestBase.h"
-
-#include "processors/GetFile.h"
-#include "processors/PutFile.h"
-
-#include "PutSQL.h"
-
-TEST_CASE("Test Creation of PutSQL", "[PutSQLCreate]") {  // NOLINT
-  TestController testController;
-  std::shared_ptr<core::Processor>
-      processor = std::make_shared<org::apache::nifi::minifi::processors::PutSQL>("processorname");
-  REQUIRE(processor->getName() == "processorname");
-}
-
-TEST_CASE("Test Put", "[PutSQLPut]") {  // NOLINT
-  TestController testController;
-
-  LogTestController::getInstance().setTrace<TestPlan>();
-  LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
-  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
-  LogTestController::getInstance().setTrace<processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<processors::PutSQL>();
-
-  auto plan = testController.createPlan();
-  auto repo = std::make_shared<TestRepository>();
-
-  // Define directory for test db
-  std::string test_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
-
-  // Define test db file
-  std::string test_db(test_dir);
-  test_db.append("/test.db");
-
-  // Create test db
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-  }
-
-  // Build MiNiFi processing graph
-  auto generate = plan->addProcessor(
-      "GenerateFlowFile",
-      "Generate");
-  auto update = plan->addProcessor(
-      "UpdateAttribute",
-      "Update",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.1.value",
-      "42",
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.2.value",
-      "asdf",
-      true);
-  auto log = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-  auto put = plan->addProcessor(
-      "PutSQL",
-      "PutSQL",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      put,
-      "Connection URL",
-      "sqlite://" + test_db);
-  plan->setProperty(
-      put,
-      "SQL Statement",
-      "INSERT INTO test_table (int_col, text_col) VALUES (?, ?)");
-
-  plan->runNextProcessor();  // Generate
-  plan->runNextProcessor();  // Update
-  plan->runNextProcessor();  // Log
-  plan->runNextProcessor();  // PutSQL
-
-  // Verify output state
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-    REQUIRE(42 == stmt.column_int64(0));
-    REQUIRE("asdf" == stmt.column_text(1));
-  }
-}
-
-TEST_CASE("Test Put Content", "[PutSQLPutContent]") {  // NOLINT
-  TestController testController;
-
-  LogTestController::getInstance().setTrace<TestPlan>();
-  LogTestController::getInstance().setTrace<processors::GetFile>();
-  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
-  LogTestController::getInstance().setTrace<processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<processors::PutSQL>();
-
-  auto plan = testController.createPlan();
-  auto repo = std::make_shared<TestRepository>();
-
-  // Define directory for test db
-  std::string test_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
-
-  // Define test db file
-  std::string test_db(test_dir);
-  test_db.append("/test.db");
-
-  // Define directory for test input file
-  std::string test_in_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_in_dir[0]).empty());
-
-  // Define test input file
-  std::string test_file(test_in_dir);
-  test_file.append("/test.in");
-
-  // Write test SQL content
-  {
-    std::ofstream os(test_file);
-    os << "INSERT INTO test_table VALUES(?, ?);";
-  }
-
-  // Create test db
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-  }
-
-  // Build MiNiFi processing graph
-  auto get_file = plan->addProcessor(
-      "GetFile",
-      "Get");
-  plan->setProperty(
-      get_file,
-      processors::GetFile::Directory.getName(), test_in_dir);
-  auto update = plan->addProcessor(
-      "UpdateAttribute",
-      "Update",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.1.value",
-      "4242",
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.2.value",
-      "fdsa",
-      true);
-  auto log = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-  auto put = plan->addProcessor(
-      "PutSQL",
-      "PutSQL",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      put,
-      "Connection URL",
-      "sqlite://" + test_db);
-
-  plan->runNextProcessor();  // Get
-  plan->runNextProcessor();  // Update
-  plan->runNextProcessor();  // Log
-  plan->runNextProcessor();  // PutSQL
-
-  // Verify output state
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("SELECT int_col, text_col FROM test_table;");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-    REQUIRE(4242 == stmt.column_int64(0));
-    REQUIRE("fdsa" == stmt.column_text(1));
-  }
-}
-
-TEST_CASE("Test Exec", "[ExecuteSQL]") {  // NOLINT
-  TestController testController;
-
-  LogTestController::getInstance().setTrace<TestPlan>();
-  LogTestController::getInstance().setTrace<processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
-
-  auto plan = testController.createPlan();
-  auto repo = std::make_shared<TestRepository>();
-
-  // Define directory for test db
-  std::string test_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
-
-  // Define test db file
-  std::string test_db(test_dir);
-  test_db.append("/test.db");
-
-  // Create test db
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("CREATE TABLE test_table (int_col INTEGER, text_col TEXT);");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-
-    // Insert test data
-    auto stmt2 = db.prepare("INSERT INTO test_table (int_col, text_col) VALUES (42, 'asdf');");
-    stmt2.step();
-    REQUIRE(stmt2.is_ok());
-  }
-
-  // Build MiNiFi processing graph
-  auto exec = plan->addProcessor(
-      "ExecuteSQL",
-      "ExecuteSQL");
-  plan->setProperty(
-      exec,
-      "Connection URL",
-      "sqlite://" + test_db);
-  plan->setProperty(
-      exec,
-      "SQL Statement",
-      "SELECT * FROM test_table;");
-  auto log = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-
-  plan->runNextProcessor();  // Exec
-  plan->runNextProcessor();  // Log
-
-  // Verify output state
-  REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
-  REQUIRE(LogTestController::getInstance().contains("key:text_col value:asdf"));
-}
-
-TEST_CASE("Test Exec 2", "[ExecuteSQL2]") {  // NOLINT
-  TestController testController;
-
-  LogTestController::getInstance().setTrace<TestPlan>();
-  LogTestController::getInstance().setTrace<processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
-  LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
-  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
-
-  auto plan = testController.createPlan();
-  auto repo = std::make_shared<TestRepository>();
-
-  // Define directory for test db
-  std::string test_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
-
-  // Define test db file
-  std::string test_db(test_dir);
-  test_db.append("/test.db");
-
-  // Create test db
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col INTEGER, text_col TEXT);");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-
-    // Insert test data
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (1, 33, 'aaaa');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (2, 42, 'bbbb');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (3, 24, 'cccc');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-  }
-
-  // Build MiNiFi processing graph
-  auto generate = plan->addProcessor(
-      "GenerateFlowFile",
-      "Generate");
-  auto update = plan->addProcessor(
-      "UpdateAttribute",
-      "Update",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.1.value",
-      "2",
-      true);
-  auto exec = plan->addProcessor(
-      "ExecuteSQL",
-      "ExecuteSQL",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      exec,
-      "Connection URL",
-      "sqlite://" + test_db);
-  plan->setProperty(
-      exec,
-      "SQL Statement",
-      "SELECT * FROM test_table WHERE id_col = ?;");
-  std::set<core::Relationship> auto_term_rels;
-  core::Relationship original("original", "");
-  auto_term_rels.insert(original);
-  exec->setAutoTerminatedRelationships(auto_term_rels);
-  auto log = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-
-  plan->runNextProcessor();  // Gen
-  plan->runNextProcessor();  // Update
-  plan->runNextProcessor();  // Exec
-  plan->runNextProcessor();  // Log
-
-  // Verify output state
-  REQUIRE(LogTestController::getInstance().contains("key:id_col value:2"));
-  REQUIRE(LogTestController::getInstance().contains("key:int_col value:42"));
-  REQUIRE(LogTestController::getInstance().contains("key:text_col value:bbbb"));
-}
-
-TEST_CASE("Test Exec 3", "[ExecuteSQL3]") {  // NOLINT
-  TestController testController;
-
-  LogTestController::getInstance().setTrace<TestPlan>();
-  LogTestController::getInstance().setTrace<processors::GetFile>();
-  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
-  LogTestController::getInstance().setTrace<processors::LogAttribute>();
-  LogTestController::getInstance().setTrace<processors::ExecuteSQL>();
-
-  auto plan = testController.createPlan();
-  auto repo = std::make_shared<TestRepository>();
-
-  // Define directory for test db
-  std::string test_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_dir[0]).empty());
-
-  // Define test db file
-  std::string test_db(test_dir);
-  test_db.append("/test.db");
-
-  // Define directory for test input file
-  std::string test_in_dir("/tmp/gt.XXXXXX");
-  REQUIRE(!testController.createTempDirectory(&test_in_dir[0]).empty());
-
-  // Define test input file
-  std::string test_file(test_in_dir);
-  test_file.append("/test.in");
-
-  // Write test SQL content
-  {
-    std::ofstream os(test_file);
-    os << "SELECT text_col FROM test_table WHERE id_col = ?;";
-  }
-
-  // Create test db
-  {
-    minifi::sqlite::SQLiteConnection db(test_db);
-    auto stmt = db.prepare("CREATE TABLE test_table (id_col INTEGER, int_col INTEGER, text_col TEXT);");
-    stmt.step();
-    REQUIRE(stmt.is_ok());
-
-    // Insert test data
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (1, 33, 'aaaa');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (2, 42, 'bbbb');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-    {
-      auto ins = db.prepare("INSERT INTO test_table (id_col, int_col, text_col) VALUES (3, 24, 'cccc');");
-      ins.step();
-      REQUIRE(ins.is_ok());
-    }
-  }
-
-  // Build MiNiFi processing graph
-  auto get_file = plan->addProcessor(
-      "GetFile",
-      "Get");
-  plan->setProperty(
-      get_file,
-      processors::GetFile::Directory.getName(), test_in_dir);
-  auto update = plan->addProcessor(
-      "UpdateAttribute",
-      "Update",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      update,
-      "sql.args.1.value",
-      "2",
-      true);
-  auto exec = plan->addProcessor(
-      "ExecuteSQL",
-      "ExecuteSQL",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      exec,
-      "Connection URL",
-      "sqlite://" + test_db);
-  std::set<core::Relationship> auto_term_rels;
-  core::Relationship original("original", "");
-  auto_term_rels.insert(original);
-  exec->setAutoTerminatedRelationships(auto_term_rels);
-  auto log = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-
-  plan->runNextProcessor();  // Get
-  plan->runNextProcessor();  // Update
-  plan->runNextProcessor();  // Exec
-  plan->runNextProcessor();  // Log
-
-  // Verify output state
-  REQUIRE(LogTestController::getInstance().contains("key:text_col value:bbbb"));
-}