You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/17 00:21:46 UTC

[nifi-minifi-cpp] branch main updated (e63a287fe -> 2c7f989ae)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from e63a287fe MINIFICPP-1997 Fix transiently failing FileSystemRepositoryTests
     new ce288b920 MINIFICPP-2035 NiFi flow json format support
     new 5a9c1c334 MINIFICPP-2034 Cache SID lookups in CWEL
     new 7e8c5c49c MINIFICPP-2045 Synchronous flow file reloading
     new 2c7f989ae MINIFICPP-2007 Add rocksdb compression options

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CMakeLists.txt                                     |   6 +
 CONFIGURE.md                                       |  10 +-
 LICENSE                                            | 102 +++++++--
 NOTICE                                             |   2 +
 PROCESSORS.md                                      |   1 +
 cmake/BundledRocksDB.cmake                         |  35 +++-
 cmake/BundledZLIB.cmake                            |   1 -
 cmake/LZ4.cmake                                    |  53 +++++
 cmake/Zstd.cmake                                   |  56 +++++
 .../FindBZip2.cmake => lz4/dummy/Findlz4.cmake}    |  24 +--
 .../FindBZip2.cmake => zstd/dummy/Findzstd.cmake}  |  24 +--
 conf/minifi.properties                             |   3 +-
 .../integration/MiNiFi_integration_test_driver.py  |   3 +
 docker/test/integration/cluster/ContainerStore.py  |   3 +
 .../test/integration/cluster/DockerTestCluster.py  |   3 +
 .../cluster/containers/MinifiContainer.py          |   9 +-
 docker/test/integration/features/s2s.feature       |   7 +-
 docker/test/integration/features/script.feature    |   3 +-
 docker/test/integration/features/sql.feature       |   3 +-
 .../Minifi_flow_json_serializer.py                 | 141 +++++++++++++
 docker/test/integration/steps/steps.py             |   5 +
 extensions/http-curl/protocols/RESTSender.cpp      |   1 +
 extensions/libarchive/CMakeLists.txt               |   6 -
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  38 +++-
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    | 121 +++--------
 extensions/rocksdb-repos/FlowFileRepository.h      |  12 --
 extensions/rocksdb-repos/database/RocksDbUtils.cpp |  54 +++++
 extensions/rocksdb-repos/database/RocksDbUtils.h   |  21 +-
 .../rocksdb-repos/database/StringAppender.cpp      |   2 +-
 .../tests/unit/AdaptiveConfigurationTests.cpp      |  66 ++++++
 .../tests/unit/FlowJsonTests.cpp                   | 163 +++++++++++++++
 .../standard-processors/tests/unit/PutTCPTests.cpp |   4 +-
 .../tests/unit/YamlConfigurationTests.cpp          | 153 +++-----------
 .../windows-event-log/ConsumeWindowsEventLog.cpp   |  25 ++-
 .../windows-event-log/ConsumeWindowsEventLog.h     |   7 +-
 extensions/windows-event-log/tests/CMakeLists.txt  |   6 +-
 .../tests/ConsumeWindowsEventLogTests.cpp          |   3 +-
 .../windows-event-log/tests/LookupCacherTests.cpp  |  77 +++++++
 .../tests/MetadataWalkerTests.cpp                  |  24 +--
 .../wel/LookupCacher.cpp}                          |  34 ++-
 .../windows-event-log/wel/LookupCacher.h           |  45 ++--
 .../windows-event-log/wel/MetadataWalker.cpp       |   5 +-
 extensions/windows-event-log/wel/MetadataWalker.h  |  10 +-
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/RemoteProcessorGroupPort.h       |   4 +
 libminifi/include/core/ContentRepository.h         |   2 +
 libminifi/include/core/ProcessGroup.h              |   2 -
 libminifi/include/core/ProcessorConfig.h           |   6 +-
 .../core/flow/AdaptiveConfiguration.h}             |  15 +-
 libminifi/include/core/flow/CheckRequiredField.h   |   6 +-
 libminifi/include/core/flow/FlowSchema.h           |  88 ++++++++
 libminifi/include/core/flow/Node.h                 |  59 +++++-
 .../include/core/flow/StructuredConfiguration.h    |  28 ++-
 .../include/core/flow/StructuredConnectionParser.h |  10 +-
 libminifi/include/core/json/JsonConfiguration.h    |  51 -----
 libminifi/include/core/json/JsonNode.h             |  19 ++
 .../include/core/repository/FileSystemRepository.h |   2 +
 .../core/repository/VolatileContentRepository.h    |   4 +
 .../include/core/state/nodes/SchedulingNodes.h     |   4 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   9 -
 libminifi/include/core/yaml/YamlNode.h             |  16 +-
 libminifi/include/properties/Configuration.h       |   3 +-
 libminifi/src/Configuration.cpp                    |   3 +-
 libminifi/src/core/ConfigurationFactory.cpp        |  14 +-
 libminifi/src/core/ProcessGroup.cpp                |   6 +-
 libminifi/src/core/flow/AdaptiveConfiguration.cpp  |  67 ++++++
 libminifi/src/core/flow/CheckRequiredField.cpp     |  18 +-
 libminifi/src/core/flow/FlowSchema.cpp             | 144 +++++++++++++
 libminifi/src/core/flow/Node.cpp                   |  10 +-
 .../src/core/flow/StructuredConfiguration.cpp      | 227 +++++++++++----------
 .../src/core/flow/StructuredConnectionParser.cpp   |  29 ++-
 libminifi/src/core/json/JsonConfiguration.cpp      |  89 --------
 .../src/core/repository/FileSystemRepository.cpp   |  13 ++
 libminifi/src/core/yaml/YamlConfiguration.cpp      |  24 +--
 libminifi/test/ConfigurationTestController.h       |  56 +++++
 libminifi/test/flow-tests/SessionTests.cpp         |   8 +-
 .../test/persistence-tests/PersistenceTests.cpp    |  10 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  39 ++++
 libminifi/test/rocksdb-tests/EncryptionTests.cpp   |   9 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         | 120 ++++++++---
 libminifi/test/unit/FileSystemRepositoryTests.cpp  |  24 +++
 minifi_main/MiNiFiMain.cpp                         |   2 +-
 ...e_gcc_clang_compiler_options_from_windows.patch |  37 ++++
 84 files changed, 1861 insertions(+), 791 deletions(-)
 create mode 100644 cmake/LZ4.cmake
 create mode 100644 cmake/Zstd.cmake
 copy cmake/{bzip2/dummy/FindBZip2.cmake => lz4/dummy/Findlz4.cmake} (56%)
 copy cmake/{bzip2/dummy/FindBZip2.cmake => zstd/dummy/Findzstd.cmake} (56%)
 create mode 100644 docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
 create mode 100644 extensions/rocksdb-repos/database/RocksDbUtils.cpp
 create mode 100644 extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
 create mode 100644 extensions/standard-processors/tests/unit/FlowJsonTests.cpp
 create mode 100644 extensions/windows-event-log/tests/LookupCacherTests.cpp
 copy extensions/{sql/data/SQLColumnIdentifier.cpp => windows-event-log/wel/LookupCacher.cpp} (58%)
 copy libminifi/include/utils/crypto/ciphers/XSalsa20.h => extensions/windows-event-log/wel/LookupCacher.h (53%)
 copy libminifi/{src/core/flow/Node.cpp => include/core/flow/AdaptiveConfiguration.h} (74%)
 create mode 100644 libminifi/include/core/flow/FlowSchema.h
 delete mode 100644 libminifi/include/core/json/JsonConfiguration.h
 create mode 100644 libminifi/src/core/flow/AdaptiveConfiguration.cpp
 create mode 100644 libminifi/src/core/flow/FlowSchema.cpp
 delete mode 100644 libminifi/src/core/json/JsonConfiguration.cpp
 create mode 100644 libminifi/test/ConfigurationTestController.h
 create mode 100644 thirdparty/zstd/exclude_gcc_clang_compiler_options_from_windows.patch


[nifi-minifi-cpp] 01/04: MINIFICPP-2035 NiFi flow json format support

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ce288b920d2ac6dd1a445524b81aaf88baa501a9
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:09:38 2023 +0100

    MINIFICPP-2035 NiFi flow json format support
    
    Closes #1494
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../integration/MiNiFi_integration_test_driver.py  |   3 +
 docker/test/integration/cluster/ContainerStore.py  |   3 +
 .../test/integration/cluster/DockerTestCluster.py  |   3 +
 .../cluster/containers/MinifiContainer.py          |   9 +-
 docker/test/integration/features/s2s.feature       |   7 +-
 docker/test/integration/features/script.feature    |   3 +-
 docker/test/integration/features/sql.feature       |   3 +-
 .../Minifi_flow_json_serializer.py                 | 141 +++++++++++++
 docker/test/integration/steps/steps.py             |   5 +
 extensions/http-curl/protocols/RESTSender.cpp      |   1 +
 .../tests/unit/AdaptiveConfigurationTests.cpp      |  66 ++++++
 .../tests/unit/FlowJsonTests.cpp                   | 163 +++++++++++++++
 .../tests/unit/YamlConfigurationTests.cpp          | 153 +++-----------
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/RemoteProcessorGroupPort.h       |   4 +
 libminifi/include/core/ProcessGroup.h              |   2 -
 libminifi/include/core/ProcessorConfig.h           |   6 +-
 .../AdaptiveConfiguration.h}                       |  31 +--
 libminifi/include/core/flow/CheckRequiredField.h   |   6 +-
 libminifi/include/core/flow/FlowSchema.h           |  88 ++++++++
 libminifi/include/core/flow/Node.h                 |  59 +++++-
 .../include/core/flow/StructuredConfiguration.h    |  28 ++-
 .../include/core/flow/StructuredConnectionParser.h |  10 +-
 libminifi/include/core/json/JsonNode.h             |  19 ++
 .../include/core/state/nodes/SchedulingNodes.h     |   4 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   9 -
 libminifi/include/core/yaml/YamlNode.h             |  16 +-
 libminifi/src/core/ConfigurationFactory.cpp        |  14 +-
 libminifi/src/core/ProcessGroup.cpp                |   6 +-
 libminifi/src/core/flow/AdaptiveConfiguration.cpp  |  67 ++++++
 libminifi/src/core/flow/CheckRequiredField.cpp     |  18 +-
 libminifi/src/core/flow/FlowSchema.cpp             | 144 +++++++++++++
 libminifi/src/core/flow/Node.cpp                   |  10 +-
 .../src/core/flow/StructuredConfiguration.cpp      | 227 +++++++++++----------
 .../src/core/flow/StructuredConnectionParser.cpp   |  29 ++-
 libminifi/src/core/json/JsonConfiguration.cpp      |  89 --------
 libminifi/src/core/yaml/YamlConfiguration.cpp      |  24 +--
 libminifi/test/ConfigurationTestController.h       |  56 +++++
 minifi_main/MiNiFiMain.cpp                         |   2 +-
 39 files changed, 1076 insertions(+), 454 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index e3153dffe..21dec70b4 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -315,3 +315,6 @@ class MiNiFi_integration_test:
 
     def enable_sql_in_minifi(self):
         self.cluster.enable_sql_in_minifi()
+
+    def set_yaml_in_minifi(self):
+        self.cluster.set_yaml_in_minifi()
diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py
index 867e41076..fb5f9c6b8 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -160,6 +160,9 @@ class ContainerStore:
     def enable_sql_in_minifi(self):
         self.minifi_options.enable_sql = True
 
+    def set_yaml_in_minifi(self):
+        self.minifi_options.config_format = "yaml"
+
     def get_startup_finished_log_entry(self, container_name):
         return self.containers[container_name].get_startup_finished_log_entry()
 
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
index 9688c732e..0a07119a5 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -85,6 +85,9 @@ class DockerTestCluster:
     def enable_sql_in_minifi(self):
         self.container_store.enable_sql_in_minifi()
 
+    def set_yaml_in_minifi(self):
+        self.container_store.set_yaml_in_minifi()
+
     def get_app_log(self, container_name):
         log_source = self.container_store.log_source(container_name)
         if log_source == LogSource.FROM_DOCKER_CONTAINER:
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
index 93d65ddda..7f70d4fb7 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -21,6 +21,7 @@ import shutil
 import copy
 from .FlowContainer import FlowContainer
 from minifi.flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
+from minifi.flow_serialization.Minifi_flow_json_serializer import Minifi_flow_json_serializer
 
 
 class MinifiOptions:
@@ -30,6 +31,7 @@ class MinifiOptions:
         self.enable_provenance = False
         self.enable_prometheus = False
         self.enable_sql = False
+        self.config_format = "json"
 
 
 class MinifiContainer(FlowContainer):
@@ -58,7 +60,12 @@ class MinifiContainer(FlowContainer):
         return "Starting Flow Controller"
 
     def _create_config(self):
-        serializer = Minifi_flow_yaml_serializer()
+        if self.options.config_format == "yaml":
+            serializer = Minifi_flow_yaml_serializer()
+        elif self.options.config_format == "json":
+            serializer = Minifi_flow_json_serializer()
+        else:
+            assert False, "Invalid flow configuration format: {}".format(self.options.config_format)
         test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
         with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file:
diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature
index a836a50c5..a51589a02 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -35,7 +35,8 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
     Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
 
   Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
     And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
 
@@ -47,7 +48,9 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
     Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds
 
   Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    # "drop empty" is only supported with yaml config
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
     And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
     And the connection going to the RemoteProcessGroup has "drop empty" set
diff --git a/docker/test/integration/features/script.feature b/docker/test/integration/features/script.feature
index 4a31b3c42..b8ca2e5f6 100644
--- a/docker/test/integration/features/script.feature
+++ b/docker/test/integration/features/script.feature
@@ -29,7 +29,8 @@ Feature: MiNiFi can execute Lua and Python scripts
     Then the Minifi logs contain the following message: "Sleeping forever" 3 times after 5 seconds
 
   Scenario: ExecuteScript should only allow one Python script running at a time
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And the scheduling period of the GenerateFlowFile processor is set to "500 ms"
     And a ExecuteScript processor with the "Script File" property set to "/tmp/resources/python/sleep_forever.py"
     And the "Script Engine" property of the ExecuteScript processor is set to "python"
diff --git a/docker/test/integration/features/sql.feature b/docker/test/integration/features/sql.feature
index 768ed4d7e..bafab8e95 100644
--- a/docker/test/integration/features/sql.feature
+++ b/docker/test/integration/features/sql.feature
@@ -33,7 +33,8 @@ Feature: Executing SQL operations from MiNiFi-C++
     Then the query "SELECT * FROM test_table WHERE int_col = 42" returns 1 rows in less than 120 seconds on the PostgreSQL server
 
   Scenario: A MiNiFi instance can query to test table with ExecuteSQL processor
-    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    Given a MiNiFi CPP server with yaml config
+    And a GenerateFlowFile processor with the "File Size" property set to "0B"
     And a UpdateAttribute processor with the "sql.args.1.value" property set to "apple"
     And the "sql.args.2.value" property of the UpdateAttribute processor is set to "banana"
     And a ExecuteSQL processor with the "SQL select query" property set to "SELECT * FROM test_table WHERE text_col = ? OR text_col = ? ORDER BY int_col DESC"
diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
new file mode 100644
index 000000000..4468a6107
--- /dev/null
+++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import uuid
+import json
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+from ..core.Funnel import Funnel
+
+
+class Minifi_flow_json_serializer:
+    def serialize(self, start_nodes, controllers):
+        res = {
+            'rootGroup': {
+                'name': 'MiNiFi Flow',
+                'processors': [],
+                'funnels': [],
+                'connections': [],
+                'remoteProcessGroups': [],
+                'controllerServices': []
+            }
+        }
+        visited = []
+
+        for node in start_nodes:
+            self.serialize_node(node, res['rootGroup'], visited)
+
+        for controller in controllers:
+            self.serialize_controller(controller, res['rootGroup'])
+
+        return json.dumps(res)
+
+    def serialize_node(self, connectable, root, visited):
+        visited.append(connectable)
+
+        if hasattr(connectable, 'name'):
+            connectable_name = connectable.name
+        else:
+            connectable_name = str(connectable.uuid)
+
+        if isinstance(connectable, InputPort):
+            group = connectable.remote_process_group
+            res_group = None
+
+            for res_group_candidate in root['remoteProcessGroups']:
+                assert isinstance(res_group_candidate, dict)
+                if res_group_candidate['identifier'] == str(group.uuid):
+                    res_group = res_group_candidate
+
+            if res_group is None:
+                res_group = {
+                    'name': group.name,
+                    'identifier': str(group.uuid),
+                    'targetUri': group.url,
+                    'communicationsTimeout': '30 sec',
+                    'yieldDuration': '3 sec',
+                    'inputPorts': []
+                }
+
+                root['remoteProcessGroups'].append(res_group)
+
+            res_group['inputPorts'].append({
+                'identifier': str(connectable.uuid),
+                'name': connectable.name
+            })
+
+        if isinstance(connectable, Processor):
+            root['processors'].append({
+                'name': connectable_name,
+                'identifier': str(connectable.uuid),
+                'type': connectable.class_prefix + connectable.clazz,
+                'schedulingStrategy': connectable.schedule['scheduling strategy'],
+                'schedulingPeriod': connectable.schedule['scheduling period'],
+                'penaltyDuration': connectable.schedule['penalization period'],
+                'yieldDuration': connectable.schedule['yield period'],
+                'runDurationMillis': connectable.schedule['run duration nanos'],
+                'properties': connectable.properties,
+                'autoTerminatedRelationships': connectable.auto_terminate,
+                'concurrentlySchedulableTaskCount': connectable.max_concurrent_tasks
+            })
+
+            for svc in connectable.controller_services:
+                if svc in visited:
+                    continue
+
+                visited.append(svc)
+                root['controllerServices'].append({
+                    'name': svc.name,
+                    'identifier': svc.id,
+                    'type': svc.service_class,
+                    'properties': svc.properties
+                })
+
+        if isinstance(connectable, Funnel):
+            root['funnels'].append({
+                'identifier': str(connectable.uuid)
+            })
+
+        for conn_name in connectable.connections:
+            conn_procs = connectable.connections[conn_name]
+
+            if not isinstance(conn_procs, list):
+                conn_procs = [conn_procs]
+
+            for proc in conn_procs:
+                root['connections'].append({
+                    'name': str(uuid.uuid4()),
+                    'source': {'id': str(connectable.uuid)},
+                    'destination': {'id': str(proc.uuid)}
+                })
+                if (all(str(connectable.uuid) != x['identifier'] for x in root['funnels'])):
+                    root['connections'][-1]['selectedRelationships'] = [conn_name]
+                if proc not in visited:
+                    self.serialize_node(proc, root, visited)
+
+    def serialize_controller(self, controller, root):
+        if hasattr(controller, 'name'):
+            connectable_name = controller.name
+        else:
+            connectable_name = str(controller.uuid)
+
+        root['controllerServices'].append({
+            'name': connectable_name,
+            'identifier': controller.id,
+            'type': controller.service_class,
+            'properties': controller.properties
+        })
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 686c33ebe..cce5eaf30 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1037,3 +1037,8 @@ def step_impl(context, size: str, duration: str) -> None:
 @then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}')
 def step_impl(context, peak_usage_percent: str, duration: str) -> None:
     context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration))
+
+
+@given(u'a MiNiFi CPP server with yaml config')
+def step_impl(context):
+    context.test.set_yaml_in_minifi()
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index b58bf3233..ad3036cbe 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -162,6 +162,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct
   if (payload.getOperation() == Operation::TRANSFER) {
     auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max());
     client.setReadCallback(std::move(read));
+    client.setRequestHeader("Accept", "application/vnd.minifi-c2+json;version=1, text/yml");
   } else {
     // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
     // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
diff --git a/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
new file mode 100644
index 000000000..7bf8ddd27
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "ConfigurationTestController.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+TEST_CASE("Adaptive configuration can parse JSON") {
+  ConfigurationTestController controller;
+
+  const char* json_config = R"(
+    {
+      "Flow Controller": {"name": "root"},
+      "Processors": [
+        {
+          "id": "00000000-0000-0000-0000-000000000001",
+          "class": "DummyProcessor",
+          "name": "Proc1"
+        }
+      ],
+      "Connections": []
+    }
+  )";
+
+  core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+  auto root = config.getRootFromPayload(json_config);
+
+  REQUIRE(root->findProcessorByName("Proc1"));
+}
+
+TEST_CASE("Adaptive configuration can parse YAML") {
+  ConfigurationTestController controller;
+
+  const char* yaml_config = R"(
+Flow Controller:
+  name: root
+Processors:
+- id: 00000000-0000-0000-0000-000000000001
+  class: DummyProcessor
+  name: Proc1
+Connections: []
+  )";
+
+  core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+  auto root = config.getRootFromPayload(yaml_config);
+
+  REQUIRE(root->findProcessorByName("Proc1"));
+}
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
new file mode 100644
index 000000000..b7d1a47f8
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <memory>
+#include <chrono>
+#include "core/repository/VolatileContentRepository.h"
+#include "core/ProcessGroup.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "utils/TestUtils.h"
+#include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
+#include "Funnel.h"
+
+using namespace std::literals::chrono_literals;
+
+TEST_CASE("NiFi flow json format is correctly parsed") {
+  ConfigurationTestController test_controller;
+
+  core::flow::AdaptiveConfiguration config(test_controller.getContext());
+
+  static const std::string CONFIG_JSON =
+      R"(
+{
+  "rootGroup": {
+    "name": "MiNiFi Flow",
+    "processors": [{
+      "identifier": "00000000-0000-0000-0000-000000000001",
+      "name": "MyGenFF",
+      "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
+      "concurrentlySchedulableTaskCount": 15,
+      "schedulingStrategy": "TIMER_DRIVEN",
+      "schedulingPeriod": "3 sec",
+      "penaltyDuration": "12 sec",
+      "yieldDuration": "4 sec",
+      "runDurationMillis": 12,
+      "autoTerminatedRelationships": ["one", "two"],
+      "properties": {
+        "File Size": "10 B",
+        "Batch Size": 12
+      }
+    }],
+    "funnels": [{
+      "identifier": "00000000-0000-0000-0000-000000000010",
+      "name": "CoolFunnel"
+    }],
+    "connections": [{
+      "identifier": "00000000-0000-0000-0000-000000000002",
+      "name": "GenToFunnel",
+      "source": {
+        "id": "00000000-0000-0000-0000-000000000001",
+        "name": "MyGenFF"
+      },
+      "destination": {
+        "id": "00000000-0000-0000-0000-000000000010",
+        "name": "CoolFunnel"
+      },
+      "selectedRelationships": ["a", "b"],
+      "backPressureObjectThreshold": 7,
+      "backPressureDataSizeThreshold": "11 KB",
+      "flowFileExpiration": "13 sec"
+    }, {
+     "identifier": "00000000-0000-0000-0000-000000000008",
+      "name": "FunnelToS2S",
+      "source": {
+        "id": "00000000-0000-0000-0000-000000000010",
+        "name": "CoolFunnel"
+      },
+      "destination": {
+        "id": "00000000-0000-0000-0000-000000000003",
+        "name": "AmazingInputPort"
+      },
+      "selectedRelationships": ["success"]
+    }],
+    "remoteProcessGroups": [{
+      "name": "NiFi Flow",
+      "targetUri": "https://localhost:8090/nifi",
+      "yieldDuration": "6 sec",
+      "communicationsTimeout": "19 sec",
+      "inputPorts": [{
+        "identifier": "00000000-0000-0000-0000-000000000003",
+        "name": "AmazingInputPort",
+        "targetId": "00000000-0000-0000-0000-000000000005",
+        "concurrentlySchedulableTaskCount": 7
+      }]
+    }]
+  }
+})";
+
+  std::unique_ptr<core::ProcessGroup> flow = config.getRootFromPayload(CONFIG_JSON);
+  REQUIRE(flow);
+
+  // verify processor
+  auto* proc = flow->findProcessorByName("MyGenFF");
+  REQUIRE(proc);
+  REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001");
+  REQUIRE(15 == proc->getMaxConcurrentTasks());
+  REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == proc->getSchedulingStrategy());
+  REQUIRE(3s == proc->getSchedulingPeriodNano());
+  REQUIRE(12s == proc->getPenalizationPeriod());
+  REQUIRE(4s == proc->getYieldPeriodMsec());
+  REQUIRE(proc->isAutoTerminated({"one", ""}));
+  REQUIRE(proc->isAutoTerminated({"two", ""}));
+  REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
+  REQUIRE(proc->getProperty("File Size") == "10 B");
+  REQUIRE(proc->getProperty("Batch Size") == "12");
+
+  // verify funnel
+  auto* funnel = dynamic_cast<minifi::Funnel*>(flow->findProcessorByName("CoolFunnel"));
+  REQUIRE(funnel);
+  REQUIRE(funnel->getUUIDStr() == "00000000-0000-0000-0000-000000000010");
+
+  // verify RPG input port
+  auto* port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(flow->findProcessorByName("AmazingInputPort"));
+  REQUIRE(port);
+  REQUIRE(port->getUUIDStr() == "00000000-0000-0000-0000-000000000003");
+  REQUIRE(port->getMaxConcurrentTasks() == 7);
+  REQUIRE(port->getInstances().size() == 1);
+  REQUIRE(port->getInstances().front().host_ == "localhost");
+  REQUIRE(port->getInstances().front().port_ == 8090);
+  REQUIRE(port->getInstances().front().protocol_ == "https://");
+  REQUIRE(port->getProperty("Port UUID") == "00000000-0000-0000-0000-000000000005");
+
+  // verify connection
+  std::map<std::string, minifi::Connection*> connection_map;
+  flow->getConnections(connection_map);
+  REQUIRE(4 == connection_map.size());
+  auto connection1 = connection_map.at("00000000-0000-0000-0000-000000000002");
+  REQUIRE(connection1);
+  REQUIRE("GenToFunnel" == connection1->getName());
+  REQUIRE(connection1->getSource() == proc);
+  REQUIRE(connection1->getDestination() == funnel);
+  REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}}));
+  REQUIRE(connection1->getMaxQueueSize() == 7);
+  REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB);
+  REQUIRE(13s == connection1->getFlowExpirationDuration());
+
+  auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008");
+  REQUIRE(connection2);
+  REQUIRE("FunnelToS2S" == connection2->getName());
+  REQUIRE(connection2->getSource() == funnel);
+  REQUIRE(connection2->getDestination() == port);
+  REQUIRE(connection2->getRelationships() == (std::set<core::Relationship>{{"success", ""}}));
+}
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index b14ee1f83..651480578 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -28,17 +28,14 @@
 #include "Catch.h"
 #include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
 
 using namespace std::literals::chrono_literals;
 
 TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   SECTION("loading YAML without optional component IDs works") {
     static const std::string CONFIG_YAML_WITHOUT_IDS =
@@ -222,13 +219,9 @@ Provenance Reporting:
 }
 
 TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -347,13 +340,9 @@ NiFi Properties Overrides: {}
 }
 
 TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML =
       R"(
@@ -496,17 +485,9 @@ NiFi Properties Overrides: {}
 }
 
 TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setTrace<core::YamlConfiguration>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -531,17 +512,9 @@ Processors:
 }
 
 TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -567,25 +540,16 @@ Processors:
   } catch (const std::exception &e) {
     caught_exception = true;
     REQUIRE("Unable to parse configuration file for component named 'XYZ' because required property "
-        "'Input Directory' is not set [in 'Processors' section of configuration file]" == std::string(e.what()));
+        "'Input Directory' is not set [in '/Processors/0/Properties' section of configuration file]" == std::string(e.what()));
   }
 
   REQUIRE(caught_exception);
 }
 
 TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  logTestController.setDebug<core::Processor>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string TEST_CONFIG_YAML = R"(
 Flow Controller:
@@ -623,17 +587,9 @@ class DummyComponent : public core::ConfigurableComponent {
 };
 
 TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -644,17 +600,9 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
 }
 
 TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
@@ -673,16 +621,9 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
 }
 
 TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -693,15 +634,9 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
 }
 
 TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
-  TestController test_controller;
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -712,16 +647,9 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
 }
 
 TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") {
-  TestController test_controller;
-
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -740,15 +668,8 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]")
 }
 
 TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
-  TestController test_controller;
-  LogTestController &logTestController = LogTestController::getInstance();
-  logTestController.setDebug<TestPlan>();
-  logTestController.setDebug<core::YamlConfiguration>();
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  ConfigurationTestController test_controller;
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
   const auto component = std::make_shared<DummyComponent>();
   component->setSupportedProperties(std::array{
     core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -767,13 +688,9 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
 }
 
 TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
-  TestController test_controller;
+  ConfigurationTestController test_controller;
 
-  std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+  core::YamlConfiguration yamlConfig(test_controller.getContext());
 
   static const std::string CONFIG_YAML_WITH_FUNNEL =
     R"(
@@ -858,12 +775,8 @@ Remote Process Groups: []
 }
 
 TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
-  TestController test_controller;
-  std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository");
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  core::YamlConfiguration yaml_config({test_flow_file_repo, content_repo, stream_factory, configuration});
+  ConfigurationTestController test_controller;
+  core::YamlConfiguration yaml_config(test_controller.getContext());
 
   for (char i = '1'; i <= '8'; ++i) {
     DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 354e944e2..dc51d0148 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -60,7 +60,7 @@ if (NOT OPENSSL_OFF)
     set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" " [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
 
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 6cf1778ce..3e8cee690 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -160,6 +160,10 @@ class RemoteProcessorGroupPort : public core::Processor {
     }
   }
 
+  std::vector<RPG> getInstances() const {
+    return nifi_instances_;
+  }
+
   void setHTTPProxy(const utils::HTTPProxy &proxy) {
     this->proxy_ = proxy;
   }
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index af43f8e02..89b1aa2cd 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -60,8 +60,6 @@ enum ProcessGroupType {
   REMOTE_PROCESS_GROUP,
 };
 
-#define ONSCHEDULE_RETRY_INTERVAL 30000  // millisecs
-
 class ProcessGroup : public CoreComponent {
   friend struct ::ProcessGroupTestAccessor;
  public:
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index fee9bd7e9..d454a3f69 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -30,11 +30,11 @@ namespace minifi {
 namespace core {
 
 
-#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
-#define DEFAULT_SCHEDULING_PERIOD_STR "1 sec"
+constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
+constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
 constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000};
 constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0};
-#define DEFAULT_MAX_CONCURRENT_TASKS 1
+constexpr int DEFAULT_MAX_CONCURRENT_TASKS{1};
 constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1};
 constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
 
diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/flow/AdaptiveConfiguration.h
similarity index 55%
rename from libminifi/include/core/json/JsonConfiguration.h
rename to libminifi/include/core/flow/AdaptiveConfiguration.h
index 35623381d..67a4c5885 100644
--- a/libminifi/include/core/json/JsonConfiguration.h
+++ b/libminifi/include/core/flow/AdaptiveConfiguration.h
@@ -15,37 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
-#include <memory>
-#include <optional>
 #include <string>
-#include <unordered_set>
+#include <memory>
 
-#include "core/FlowConfiguration.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "core/ProcessorConfig.h"
-#include "Exception.h"
-#include "io/StreamFactory.h"
-#include "io/validation.h"
-#include "sitetosite/SiteToSite.h"
-#include "utils/Id.h"
-#include "utils/StringUtils.h"
-#include "utils/file/FileSystem.h"
-#include "core/flow/StructuredConfiguration.h"
+#include "StructuredConfiguration.h"
 
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi::core::flow {
 
-class JsonConfiguration : public flow::StructuredConfiguration {
+class AdaptiveConfiguration : public StructuredConfiguration {
  public:
-  explicit JsonConfiguration(ConfigurationContext ctx);
-
-  ~JsonConfiguration() override = default;
-
-  std::unique_ptr<core::ProcessGroup> getRoot() override;
+  explicit AdaptiveConfiguration(ConfigurationContext ctx);
 
-  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override;
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &payload) override;
 };
 
-}  // namespace org::apache::nifi::minifi::core
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h
index 6c35b0cbc..11dc2b1e1 100644
--- a/libminifi/include/core/flow/CheckRequiredField.h
+++ b/libminifi/include/core/flow/CheckRequiredField.h
@@ -27,7 +27,7 @@
 namespace org::apache::nifi::minifi::core::flow {
 
 bool isFieldPresent(const Node &node, std::string_view field_name);
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = "");
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names);
 
 /**
  * This is a helper function for verifying the existence of a required
@@ -47,8 +47,8 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
  * @throws std::invalid_argument if the required field 'field_name' is
  *                               not present in 'node'
  */
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = "");
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_name, std::string_view error_message = "");
 
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {});
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message = {});
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h
new file mode 100644
index 000000000..0badac303
--- /dev/null
+++ b/libminifi/include/core/flow/FlowSchema.h
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+#include <string>
+
+namespace org::apache::nifi::minifi::core::flow {
+
+struct FlowSchema {
+  using Keys = std::vector<std::string>;
+
+  Keys flow_header;
+  Keys root_group;
+
+  Keys processors;
+  Keys processor_properties;
+  Keys autoterminated_rels;
+  Keys max_concurrent_tasks;
+  Keys penalization_period;
+  Keys proc_yield_period;
+  Keys runduration_nanos;
+  Keys onschedule_retry_interval;
+
+  Keys connections;
+  Keys max_queue_size;
+  Keys max_queue_data_size;
+  Keys swap_threshold;
+  Keys source_id;
+  Keys source_name;
+  Keys destination_id;
+  Keys destination_name;
+  Keys flowfile_expiration;
+  Keys drop_empty;
+  Keys source_relationship;
+  Keys source_relationship_list;
+
+  Keys process_groups;
+  Keys process_group_version;
+  Keys scheduling_strategy;
+  Keys scheduling_period;
+  Keys name;
+  Keys identifier;
+  Keys type;
+  Keys controller_services;
+  Keys controller_service_properties;
+  Keys remote_process_group;
+  Keys provenance_reporting;
+  Keys provenance_reporting_port_uuid;
+  Keys provenance_reporting_batch_size;
+  Keys funnels;
+  Keys input_ports;
+  Keys output_ports;
+  Keys rpg_url;
+  Keys rpg_yield_period;
+  Keys rpg_timeout;
+  Keys rpg_local_network_interface;
+  Keys rpg_transport_protocol;
+  Keys rpg_proxy_host;
+  Keys rpg_proxy_user;
+  Keys rpg_proxy_password;
+  Keys rpg_proxy_port;
+  Keys rpg_input_ports;
+  Keys rpg_output_ports;
+  Keys rpg_port_properties;
+  Keys rpg_port_target_id;
+
+  static FlowSchema getDefault();
+  static FlowSchema getNiFiFlowJson();
+};
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h
index 78735f95d..1bd3ad7e2 100644
--- a/libminifi/include/core/flow/Node.h
+++ b/libminifi/include/core/flow/Node.h
@@ -25,6 +25,8 @@
 #include <memory>
 #include <utility>
 #include "nonstd/expected.hpp"
+#include "utils/StringUtils.h"
+#include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
@@ -37,6 +39,7 @@ class Node {
   };
 
   class Iterator {
+    friend class Node;
    public:
     class Value;
 
@@ -53,6 +56,7 @@ class Node {
 
     Iterator& operator++() {
       impl_->operator++();
+      ++idx_;
       return *this;
     }
 
@@ -75,6 +79,8 @@ class Node {
 
    private:
     std::unique_ptr<IteratorImpl> impl_;
+    std::string path_;
+    int idx_{0};
   };
 
   class NodeImpl {
@@ -88,6 +94,7 @@ class Node {
     virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0;
     virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0;
     virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0;
+    virtual nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const = 0;
 
     virtual std::string getDebugString() const = 0;
 
@@ -98,6 +105,8 @@ class Node {
 
     virtual std::optional<Cursor> getCursor() const = 0;
 
+    virtual Node createEmpty() const = 0;
+
     virtual ~NodeImpl() = default;
   };
 
@@ -113,6 +122,7 @@ class Node {
   nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();}
   nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();}
   nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();}
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const {return impl_->getScalarAsString();}
 
   // return a string representation of the node (need not to be deserializable)
   std::string getDebugString() const {return impl_->getDebugString();}
@@ -121,14 +131,57 @@ class Node {
   size_t empty() const {
     return size() == 0;
   }
-  Iterator begin() const {return impl_->begin();}
-  Iterator end() const {return impl_->end();}
-  Node operator[](std::string_view key) const {return impl_->operator[](key);}
+  Iterator begin() const {
+    Iterator it = impl_->begin();
+    it.path_ = path_;
+    return it;
+  }
+  Iterator end() const {
+    Iterator it = impl_->end();
+    it.path_ = path_;
+    return it;
+  }
+
+  // considers @key to be a member of this node as is
+  Node getMember(std::string_view key) {
+    Node result = impl_->operator[](key);
+    result.path_ = utils::StringUtils::join_pack(path_, "/", key);
+    return result;
+  }
+
+  // considers @key to be a '/'-delimited access path
+  Node operator[](std::string_view key) const {
+    Node result = *this;
+    for (auto& field : utils::StringUtils::split(std::string{key}, "/")) {
+      if (key == ".") {
+        // pass: self
+      } else {
+        result = result.getMember(field);
+      }
+      if (!result) {
+        break;
+      }
+    }
+    return result;
+  }
+
+  // considers @keys to be a set of viable access paths, the first viable is returned
+  Node operator[](gsl::span<const std::string> keys) const {
+    for (auto& key : keys) {
+      if (Node result = (*this)[key]) {
+        return result;
+      }
+    }
+    return impl_->createEmpty();
+  }
+
+  std::string getPath() const {return path_;}
 
   std::optional<Cursor> getCursor() const {return impl_->getCursor();}
 
  private:
   std::shared_ptr<NodeImpl> impl_;
+  std::string path_;
 };
 
 class Node::Iterator::Value : public Node, public std::pair<Node, Node> {
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h
index 1b46cda9d..762a2b352 100644
--- a/libminifi/include/core/flow/StructuredConfiguration.h
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <unordered_set>
+#include <vector>
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -33,19 +34,10 @@
 #include "utils/StringUtils.h"
 #include "utils/file/FileSystem.h"
 #include "core/flow/Node.h"
+#include "FlowSchema.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
-static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller";
-static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors";
-static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
-static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting";
-static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels";
-static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports";
-static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports";
-
 class StructuredConfiguration : public FlowConfiguration {
  public:
   StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger);
@@ -60,6 +52,8 @@ class StructuredConfiguration : public FlowConfiguration {
    */
   void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &section) const;
 
+  std::unique_ptr<core::ProcessGroup> getRoot() override;
+
  protected:
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
@@ -70,7 +64,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @return             the root ProcessGroup node of the flow
    *                       configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node);
+  std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node, FlowSchema schema);
 
   std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false);
 
@@ -99,7 +93,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @param parent    the parent ProcessGroup for the port
    * @param direction the TransferDirection of the port
    */
-  void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
+  void parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
 
   /**
    * Parses the root level node for the flow configuration and
@@ -155,7 +149,7 @@ class StructuredConfiguration : public FlowConfiguration {
    * @param properties_node the Node containing the properties
    * @param processor      the Processor to which to add the resulting properties
    */
-  void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section);
+  void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name);
 
   /**
    * Parses the Funnels section of a configuration.
@@ -195,9 +189,9 @@ class StructuredConfiguration : public FlowConfiguration {
    *                   is optional and defaults to 'id'
    * @return         the parsed or generated UUID string
    */
-  std::string getOrGenerateId(const Node& node, const std::string& id_field = "id");
+  std::string getOrGenerateId(const Node& node);
 
-  std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = "");
+  std::string getRequiredIdField(const Node& node, std::string_view error_message = "");
 
   /**
    * This is a helper function for getting an optional value, if it exists.
@@ -213,7 +207,9 @@ class StructuredConfiguration : public FlowConfiguration {
    *                       the optional field is missing. If not provided,
    *                       a default info message will be generated.
    */
-  std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = "");
+  std::string getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message = "");
+
+  FlowSchema schema_;
 
   static std::shared_ptr<utils::IdGenerator> id_generator_;
   std::unordered_set<std::string> uuids_;
diff --git a/libminifi/include/core/flow/StructuredConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h
index 2b1d73743..6d7869ea6 100644
--- a/libminifi/include/core/flow/StructuredConnectionParser.h
+++ b/libminifi/include/core/flow/StructuredConnectionParser.h
@@ -26,18 +26,19 @@
 
 #include "core/flow/Node.h"
 #include "utils/gsl.h"
+#include "core/flow/FlowSchema.h"
 
 namespace org::apache::nifi::minifi::core::flow {
 
 class StructuredConnectionParser {
  public:
-  static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" };
-
-  explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+  explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent,
+                                      const std::shared_ptr<logging::Logger>& logger, std::optional<FlowSchema> schema = std::nullopt) :
       connectionNode_(connectionNode),
       name_(name),
       parent_(parent),
-      logger_(logger) {
+      logger_(logger),
+      schema_(schema.value_or(FlowSchema::getDefault())) {
     if (!connectionNode.isMap()) {
       throw std::logic_error("Connection node is not a map");
     }
@@ -60,6 +61,7 @@ class StructuredConnectionParser {
   const std::string& name_;
   gsl::not_null<core::ProcessGroup*> parent_;
   const std::shared_ptr<logging::Logger> logger_;
+  const FlowSchema schema_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h
index d19ca1582..bb11ca251 100644
--- a/libminifi/include/core/json/JsonNode.h
+++ b/libminifi/include/core/json/JsonNode.h
@@ -46,6 +46,10 @@ class JsonNode : public flow::Node::NodeImpl {
     return node_ ? node_->IsNull() : false;
   }
 
+  flow::Node createEmpty() const override {
+    return flow::Node{std::make_shared<JsonNode>(nullptr)};
+  }
+
   nonstd::expected<std::string, std::exception_ptr> getString() const override {
     try {
       if (!node_) {
@@ -93,12 +97,27 @@ class JsonNode : public flow::Node::NodeImpl {
       if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
       if (node_->IsInt64()) return std::to_string(node_->GetInt64());
       if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+      if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
       throw std::runtime_error("Cannot get string from non-integer json value");
     } catch (...) {
       return nonstd::make_unexpected(std::current_exception());
     }
   }
 
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+    try {
+      if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
+      if (node_->IsBool()) return node_->GetBool() ? "true" : "false";
+      if (node_->IsInt64()) return std::to_string(node_->GetInt64());
+      if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+      if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
+      if (node_->IsDouble()) return std::to_string(node_->GetDouble());
+      throw std::runtime_error("Cannot convert non-scalar json value to string");
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
   std::string getDebugString() const override {
     if (!node_) return "<invalid>";
     if (node_->IsObject()) return "<Map>";
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index 2262cd6e7..a98bb3bd6 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -49,7 +49,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode defaultSchedulingStrategy;
     defaultSchedulingStrategy.name = "defaultSchedulingStrategy";
-    defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY;
+    defaultSchedulingStrategy.value = core::DEFAULT_SCHEDULING_STRATEGY;
 
     schedulingDefaults.children.push_back(defaultSchedulingStrategy);
 
@@ -67,7 +67,7 @@ class SchedulingDefaults : public DeviceInformation {
 
     SerializedResponseNode defaultMaxConcurrentTasks;
     defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks";
-    defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS;
+    defaultMaxConcurrentTasks.value = core::DEFAULT_MAX_CONCURRENT_TASKS;
 
     schedulingDefaults.children.push_back(defaultMaxConcurrentTasks);
 
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 2b4a9d7d4..4d82ea394 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -44,15 +44,6 @@ class YamlConfiguration : public flow::StructuredConfiguration {
 
   ~YamlConfiguration() override = default;
 
-  /**
-   * Returns a shared pointer to a ProcessGroup object containing the
-   * flow configuration.
-   *
-   * @return               the root ProcessGroup node of the flow
-   *                        configuration tree
-   */
-  std::unique_ptr<core::ProcessGroup> getRoot() override;
-
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
    * flow configuration. The yamlConfigStream argument must point to
diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h
index ad8422f6e..193c61d55 100644
--- a/libminifi/include/core/yaml/YamlNode.h
+++ b/libminifi/include/core/yaml/YamlNode.h
@@ -49,6 +49,10 @@ class YamlNode : public flow::Node::NodeImpl {
     return node_.IsNull();
   }
 
+  flow::Node createEmpty() const override {
+    return flow::Node{std::make_shared<YamlNode>(YAML::Node{YAML::NodeType::Undefined})};
+  }
+
   nonstd::expected<std::string, std::exception_ptr> getString() const override {
     try {
       return node_.as<std::string>();
@@ -81,6 +85,14 @@ class YamlNode : public flow::Node::NodeImpl {
     }
   }
 
+  nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+    try {
+      return node_.as<std::string>();
+    } catch (...) {
+      return nonstd::make_unexpected(std::current_exception());
+    }
+  }
+
   std::string getDebugString() const override {
     if (!node_) return "<invalid>";
     if (node_.IsNull()) return "null";
@@ -149,11 +161,11 @@ class YamlIterator : public flow::Node::Iterator::IteratorImpl {
   YAML::const_iterator it_;
 };
 
-flow::Node::Iterator YamlNode::begin() const {
+inline flow::Node::Iterator YamlNode::begin() const {
   return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())};
 }
 
-flow::Node::Iterator YamlNode::end() const {
+inline flow::Node::Iterator YamlNode::end() const {
   return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())};
 }
 
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 81e2bb1e0..277184996 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -29,7 +29,7 @@
 #include "io/StreamFactory.h"
 
 #include "core/yaml/YamlConfiguration.h"
-#include "core/json/JsonConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
 
 namespace org::apache::nifi::minifi::core {
 
@@ -38,13 +38,7 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
   if (configuration_class_name) {
     class_name_lc = configuration_class_name.value();
   } else if (ctx.path) {
-    if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) {
-      class_name_lc = "yamlconfiguration";
-    } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) {
-      class_name_lc = "jsonconfiguration";
-    } else {
-      throw std::runtime_error("Could not infer config type from file path");
-    }
+    class_name_lc = "adaptiveconfiguration";
   } else {
     throw std::runtime_error("Neither configuration class nor config file path has been specified");
   }
@@ -57,8 +51,8 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
       return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx));
-    } else if (class_name_lc == "jsonconfiguration") {
-      return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx));
+    } else if (class_name_lc == "adaptiveconfiguration") {
+      return std::unique_ptr<core::flow::AdaptiveConfiguration>(instantiate<core::flow::AdaptiveConfiguration>(ctx));
     } else {
       if (fail_safe) {
         return std::make_unique<core::FlowConfiguration>(ctx);
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7300d2129..509c0542d 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -33,6 +33,8 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
+constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000;
+
 std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
 
 ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid)
@@ -54,7 +56,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils:
   if (parent_process_group_ != nullptr) {
     onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
   } else {
-    onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+    onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
   }
   transmitting_ = false;
   transport_protocol_ = "RAW";
@@ -69,7 +71,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
       parent_process_group_(nullptr),
       logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
   yield_period_msec_ = 0ms;
-  onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+  onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
   transmitting_ = false;
   transport_protocol_ = "RAW";
 
diff --git a/libminifi/src/core/flow/AdaptiveConfiguration.cpp b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
new file mode 100644
index 000000000..cef220bb4
--- /dev/null
+++ b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/AdaptiveConfiguration.h"
+
+#include "rapidjson/document.h"
+#include "core/json/JsonNode.h"
+#include "core/yaml/YamlNode.h"
+#include "yaml-cpp/yaml.h"
+#include "utils/file/FileUtils.h"
+#include "Defaults.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+AdaptiveConfiguration::AdaptiveConfiguration(ConfigurationContext ctx)
+    : StructuredConfiguration(([&] {
+        if (!ctx.path) {
+          if (utils::file::exists(DEFAULT_NIFI_CONFIG_JSON)) {
+            ctx.path = DEFAULT_NIFI_CONFIG_JSON;
+          } else {
+            ctx.path = DEFAULT_NIFI_CONFIG_YML;
+          }
+        }
+        return std::move(ctx);
+      })(),
+      logging::LoggerFactory<AdaptiveConfiguration>::getLogger()) {}
+
+std::unique_ptr<core::ProcessGroup> AdaptiveConfiguration::getRootFromPayload(const std::string &payload) {
+  try {
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(payload.c_str(), payload.length());
+    if (res) {
+      flow::Node root{std::make_shared<JsonNode>(&doc)};
+      if (root[FlowSchema::getDefault().flow_header]) {
+        logger_->log_debug("Processing configuration as default json");
+        return getRootFrom(root, FlowSchema::getDefault());
+      } else {
+        logger_->log_debug("Processing configuration as nifi flow json");
+        return getRootFrom(root, FlowSchema::getNiFiFlowJson());
+      }
+    }
+    logger_->log_debug("Could not parse configuration as json, trying yaml");
+    YAML::Node rootYamlNode = YAML::Load(payload);
+    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+    return getRootFrom(root, FlowSchema::getDefault());
+  } catch(...) {
+    logger_->log_error("Invalid configuration file");
+    throw;
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp
index 6942a5844..4539d9d78 100644
--- a/libminifi/src/core/flow/CheckRequiredField.cpp
+++ b/libminifi/src/core/flow/CheckRequiredField.cpp
@@ -27,7 +27,7 @@ bool isFieldPresent(const Node &node, std::string_view field_name) {
   return bool{node[field_name]};
 }
 
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) {
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names) {
   const Node name_node = node["name"];
   // Build a helpful error message for the user so they can fix the
   // invalid config file, using the component name if present
@@ -36,32 +36,32 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
       name_node ?
           "Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" :
           "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
-  if (!section.empty()) {
-    err_msg += " [in '" + std::string(section) + "' section of configuration file]";
-  }
+
+  err_msg += " [in '" + node.getPath() + "' section of configuration file]";
+
   if (auto cursor = node.getCursor()) {
     err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]";
   }
   return err_msg;
 }
 
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) {
-  if (!isFieldPresent(node, field_name)) {
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_names, std::string_view error_message) {
+  if (std::none_of(field_names.begin(), field_names.end(), [&] (auto& field) {return isFieldPresent(node, field);})) {
     if (error_message.empty()) {
-      throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section));
+      throw std::invalid_argument(buildErrorMessage(node, field_names));
     }
     throw std::invalid_argument(error_message.data());
   }
 }
 
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) {
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message) {
   for (const auto& name : alternate_names) {
     if (isFieldPresent(node, name)) {
       return node[name].getString().value();
     }
   }
   if (error_message.empty()) {
-    throw std::invalid_argument(buildErrorMessage(node, alternate_names, section));
+    throw std::invalid_argument(buildErrorMessage(node, alternate_names));
   }
   throw std::invalid_argument(error_message.data());
 }
diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp
new file mode 100644
index 000000000..a5df0dc64
--- /dev/null
+++ b/libminifi/src/core/flow/FlowSchema.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/FlowSchema.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+FlowSchema FlowSchema::getDefault() {
+  return FlowSchema{
+      .flow_header = {"Flow Controller"},
+      .root_group = {"."},
+
+      .processors = {"Processors"},
+      .processor_properties = {"Properties"},
+      .autoterminated_rels = {"auto-terminated relationships list"},
+      .max_concurrent_tasks = {"max concurrent tasks"},
+      .penalization_period = {"penalization period"},
+      .proc_yield_period = {"yield period"},
+      .runduration_nanos = {"run duration nanos"},
+      .onschedule_retry_interval = {"onschedule retry interval"},
+
+      .connections = {"Connections"},
+      .max_queue_size = {"max work queue size"},
+      .max_queue_data_size = {"max work queue data size"},
+      .swap_threshold = {"swap threshold"},
+      .source_id = {"source id"},
+      .source_name = {"source name"},
+      .destination_id = {"destination id"},
+      .destination_name = {"destination name"},
+      .flowfile_expiration = {"flowfile expiration"},
+      .drop_empty = {"drop empty"},
+      .source_relationship = {"source relationship name"},
+      .source_relationship_list = {"source relationship names"},
+
+      .process_groups = {"Process Groups"},
+      .process_group_version = {"version"},
+      .scheduling_strategy = {"scheduling strategy"},
+      .scheduling_period = {"scheduling period"},
+      .name = {"name"},
+      .identifier = {"id"},
+      .type = {"class", "type"},
+      .controller_services = {"Controller Services"},
+      .controller_service_properties = {"Properties"},
+      .remote_process_group = {"Remote Processing Groups", "Remote Process Groups"},
+      .provenance_reporting = {"Provenance Reporting"},
+      .provenance_reporting_port_uuid = {"port uuid"},
+      .provenance_reporting_batch_size = {"batch size"},
+      .funnels = {"Funnels"},
+      .input_ports = {"Input Ports"},
+      .output_ports = {"Output Ports"},
+
+      .rpg_url = {"url"},
+      .rpg_yield_period = {"yield period"},
+      .rpg_timeout = {"timeout"},
+      .rpg_local_network_interface = {"local network interface"},
+      .rpg_transport_protocol = {"transport protocol"},
+      .rpg_proxy_host = {"proxy host"},
+      .rpg_proxy_user = {"proxy user"},
+      .rpg_proxy_password = {"proxy password"},
+      .rpg_proxy_port = {"proxy port"},
+      .rpg_input_ports = {"Input Ports"},
+      .rpg_output_ports = {"Output Ports"},
+      .rpg_port_properties = {"Properties"},
+      .rpg_port_target_id = {}
+  };
+}
+
+FlowSchema FlowSchema::getNiFiFlowJson() {
+  return FlowSchema{
+      .flow_header = {"rootGroup"},
+      .root_group = {"rootGroup"},
+      .processors = {"processors"},
+      .processor_properties = {"properties"},
+      .autoterminated_rels = {"autoTerminatedRelationships"},
+      .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"},
+      .penalization_period = {"penaltyDuration"},
+      .proc_yield_period = {"yieldDuration"},
+      // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue
+      .runduration_nanos = {"runDurationMillis"},
+      .onschedule_retry_interval = {},
+
+      .connections = {"connections"},
+      .max_queue_size = {"backPressureObjectThreshold"},
+      .max_queue_data_size = {"backPressureDataSizeThreshold"},
+      .swap_threshold = {},
+      .source_id = {"source/id"},
+      .source_name = {"source/name"},
+      .destination_id = {"destination/id"},
+      .destination_name = {"destination/name"},
+      .flowfile_expiration = {"flowFileExpiration"},
+      // contrary to nifi we support dropEmpty in flow json as well
+      .drop_empty = {"dropEmpty"},
+      .source_relationship = {},
+      .source_relationship_list = {"selectedRelationships"},
+
+      .process_groups = {"processGroups"},
+      .process_group_version = {},
+      .scheduling_strategy = {"schedulingStrategy"},
+      .scheduling_period = {"schedulingPeriod"},
+      .name = {"name"},
+      .identifier = {"identifier"},
+      .type = {"type"},
+      .controller_services = {"controllerServices"},
+      .controller_service_properties = {"properties"},
+      .remote_process_group = {"remoteProcessGroups"},
+      .provenance_reporting = {},
+      .provenance_reporting_port_uuid = {},
+      .provenance_reporting_batch_size = {},
+      .funnels = {"funnels"},
+      .input_ports = {"inputPorts"},
+      .output_ports = {"outputPorts"},
+
+      .rpg_url = {"targetUri"},
+      .rpg_yield_period = {"yieldDuration"},
+      .rpg_timeout = {"communicationsTimeout"},
+      .rpg_local_network_interface = {"localNetworkInterface"},
+      .rpg_transport_protocol = {"transportProtocol"},
+      .rpg_proxy_host = {"proxyHost"},
+      .rpg_proxy_user = {"proxyUser"},
+      .rpg_proxy_password = {"proxyPassword"},
+      .rpg_proxy_port = {"proxyPort"},
+      .rpg_input_ports = {"inputPorts"},
+      .rpg_output_ports = {"outputPorts"},
+      .rpg_port_properties = {},
+      .rpg_port_target_id = {"targetId"}
+  };
+}
+
+}  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/Node.cpp b/libminifi/src/core/flow/Node.cpp
index 7fd7b0df8..1c751ebe0 100644
--- a/libminifi/src/core/flow/Node.cpp
+++ b/libminifi/src/core/flow/Node.cpp
@@ -21,7 +21,15 @@
 namespace org::apache::nifi::minifi::core::flow {
 
 Node::Iterator::Value Node::Iterator::operator*() const {
-  return impl_->operator*();
+  Value value = impl_->operator*();
+  if (value) {
+    // sequence iterator
+    value.path_ = utils::StringUtils::join_pack(path_, "/", std::to_string(idx_));
+  } else if (value.second) {
+    // map iterator
+    value.second.path_ = utils::StringUtils::join_pack(path_, "/", value.first.getString().value());
+  }
+  return value;
 }
 
 }  // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 7e1d41e63..02624a4dc 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -34,13 +34,26 @@ namespace org::apache::nifi::minifi::core::flow {
 
 std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
 
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRoot() {
+  if (!config_path_) {
+    logger_->log_error("Cannot instantiate flow, no config file is set.");
+    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
+  }
+  const auto configuration = filesystem_->read(config_path_.value());
+  if (!configuration) {
+    // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+    return nullptr;
+  }
+  return getRootFromPayload(configuration.value());
+}
+
 StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger)
     : FlowConfiguration(std::move(ctx)),
       logger_(std::move(logger)) {}
 
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) {
-  auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY];
-  auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true);
+  checkRequiredField(root_flow_node, schema_.flow_header);
+  auto root_group = parseProcessGroup(root_flow_node[schema_.flow_header], root_flow_node[schema_.root_group], true);
   this->name_ = root_group->getName();
   return root_group;
 }
@@ -48,15 +61,15 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGro
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) {
   int version = 0;
 
-  checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-  auto flowName = node["name"].getString().value();
+  checkRequiredField(node, schema_.name);
+  auto flowName = node[schema_.name].getString().value();
 
   utils::Identifier uuid;
   // assignment throws on invalid uuid
   uuid = getOrGenerateId(node);
 
-  if (node["version"]) {
-    version = gsl::narrow<int>(node["version"].getInt64().value());
+  if (node[schema_.process_group_version]) {
+    version = gsl::narrow<int>(node[schema_.process_group_version].getInt64().value());
   }
 
   logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
@@ -67,8 +80,8 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
     group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
   }
 
-  if (node["onschedule retry interval"]) {
-    auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value();
+  if (node[schema_.onschedule_retry_interval]) {
+    auto onScheduleRetryPeriod = node[schema_.onschedule_retry_interval].getString().value();
     logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
 
     auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
@@ -83,20 +96,13 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
 
 std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) {
   auto group = createProcessGroup(header_node, is_root);
-  Node processorsNode = node[CONFIG_PROCESSORS_KEY];
-  Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY];
-  Node funnelsNode = node[CONFIG_FUNNELS_KEY];
-  Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY];
-  Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY];
-  Node remoteProcessingGroupsNode = [&] {
-    // assignment is not supported on invalid nodes
-    Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY];
-    if (candidate) {
-      return candidate;
-    }
-    return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3];
-  }();
-  Node childProcessGroupNodeSeq = node["Process Groups"];
+  Node processorsNode = node[schema_.processors];
+  Node connectionsNode = node[schema_.connections];
+  Node funnelsNode = node[schema_.funnels];
+  Node inputPortsNode = node[schema_.input_ports];
+  Node outputPortsNode = node[schema_.output_ports];
+  Node remoteProcessingGroupsNode = node[schema_.remote_process_group];
+  Node childProcessGroupNodeSeq = node[schema_.process_groups];
 
   parseProcessorNode(processorsNode, group.get());
   parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get());
@@ -115,10 +121,11 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(c
   return group;
 }
 
-std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node, FlowSchema schema) {
+  schema_ = std::move(schema);
   uuids_.clear();
-  Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY];
-  Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY];
+  Node controllerServiceNode = root_node[schema_.root_group][schema_.controller_services];
+  Node provenanceReportNode = root_node[schema_.provenance_reporting];
 
   parseControllerServices(controllerServiceNode);
   // Create the root process group
@@ -157,14 +164,14 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
   for (const auto& procNode : processors_node) {
     core::ProcessorConfig procCfg;
 
-    checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY);
-    procCfg.name = procNode["name"].getString().value();
+    checkRequiredField(procNode, schema_.name);
+    procCfg.name = procNode[schema_.name].getString().value();
     procCfg.id = getOrGenerateId(procNode);
 
     uuid = procCfg.id;
     logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
-    checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY);
-    procCfg.javaClass = procNode["class"].getString().value();
+    checkRequiredField(procNode, schema_.type);
+    procCfg.javaClass = procNode[schema_.type].getString().value();
     logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
 
     // Determine the processor name only from the Java class
@@ -187,36 +194,35 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
 
     processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
 
-    procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY);
+    procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
     logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
-    procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY);
+    procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);
 
     logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
-    if (auto tasksNode = procNode["max concurrent tasks"]) {
+    if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
       procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
     }
 
-    if (procNode["penalization period"]) {
-      procCfg.penalizationPeriod = procNode["penalization period"].getString().value();
+    if (auto penalizationNode = procNode[schema_.penalization_period]) {
+      procCfg.penalizationPeriod = penalizationNode.getString().value();
       logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
     }
 
-    if (procNode["yield period"]) {
-      procCfg.yieldPeriod = procNode["yield period"].getString().value();
+    if (auto yieldNode = procNode[schema_.proc_yield_period]) {
+      procCfg.yieldPeriod = yieldNode.getString().value();
       logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
     }
 
-    if (auto runNode = procNode["run duration nanos"]) {
+    if (auto runNode = procNode[schema_.runduration_nanos]) {
       procCfg.runDurationNanos = runNode.getIntegerAsString().value();
       logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
     }
 
     // handle auto-terminated relationships
-    if (procNode["auto-terminated relationships list"]) {
-      Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+    if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
       std::vector<std::string> rawAutoTerminatedRelationshipValues;
       if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
         for (const auto& autoTerminatedRel : autoTerminatedSequence) {
@@ -227,9 +233,8 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
     }
 
     // handle processor properties
-    if (procNode["Properties"]) {
-      Node propertiesNode = procNode["Properties"];
-      parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY);
+    if (Node propertiesNode = procNode[schema_.processor_properties]) {
+      parsePropertiesNode(propertiesNode, *processor, procCfg.name);
     }
 
     // Take care of scheduling
@@ -304,13 +309,13 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     return;
   }
   for (const auto& currRpgNode : rpg_node_seq) {
-    checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-    auto name = currRpgNode["name"].getString().value();
+    checkRequiredField(currRpgNode, schema_.name);
+    auto name = currRpgNode[schema_.name].getString().value();
     id = getOrGenerateId(currRpgNode);
 
     logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
 
-    auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+    auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");
 
     logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
 
@@ -318,8 +323,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     auto group = createRemoteProcessGroup(name, uuid);
     group->setParent(parentGroup);
 
-    if (currRpgNode["yield period"]) {
-      auto yieldPeriod = currRpgNode["yield period"].getString().value();
+    if (currRpgNode[schema_.rpg_yield_period]) {
+      auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
 
       auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
@@ -329,8 +334,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
       }
     }
 
-    if (currRpgNode["timeout"]) {
-      auto timeout = currRpgNode["timeout"].getString().value();
+    if (currRpgNode[schema_.rpg_timeout]) {
+      auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
 
       auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
@@ -340,33 +345,33 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
       }
     }
 
-    if (currRpgNode["local network interface"]) {
-      auto interface = currRpgNode["local network interface"].getString().value();
+    if (currRpgNode[schema_.rpg_local_network_interface]) {
+      auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
       group->setInterface(interface);
     }
 
-    if (currRpgNode["transport protocol"]) {
-      auto transport_protocol = currRpgNode["transport protocol"].getString().value();
+    if (currRpgNode[schema_.rpg_transport_protocol]) {
+      auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
       logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
       if (transport_protocol == "HTTP") {
         group->setTransportProtocol(transport_protocol);
-        if (currRpgNode["proxy host"]) {
-          auto http_proxy_host = currRpgNode["proxy host"].getString().value();
+        if (currRpgNode[schema_.rpg_proxy_host]) {
+          auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
           logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
           group->setHttpProxyHost(http_proxy_host);
-          if (currRpgNode["proxy user"]) {
-            auto http_proxy_username = currRpgNode["proxy user"].getString().value();
+          if (currRpgNode[schema_.rpg_proxy_user]) {
+            auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
             logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
             group->setHttpProxyUserName(http_proxy_username);
           }
-          if (currRpgNode["proxy password"]) {
-            auto http_proxy_password = currRpgNode["proxy password"].getString().value();
+          if (currRpgNode[schema_.rpg_proxy_password]) {
+            auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
             logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
             group->setHttpProxyPassWord(http_proxy_password);
           }
-          if (currRpgNode["proxy port"]) {
-            auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value();
+          if (currRpgNode[schema_.rpg_proxy_port]) {
+            auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
             int32_t port;
             if (core::Property::StringToInt(http_proxy_port, port)) {
               logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
@@ -386,19 +391,19 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
     group->setTransmitting(true);
     group->setURL(url);
 
-    checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-    auto inputPorts = currRpgNode["Input Ports"];
+    checkRequiredField(currRpgNode, schema_.rpg_input_ports);
+    auto inputPorts = currRpgNode[schema_.rpg_input_ports];
     if (inputPorts && inputPorts.isSequence()) {
       for (const auto& currPort : inputPorts) {
-        parsePort(currPort, group.get(), sitetosite::SEND);
+        parseRPGPort(currPort, group.get(), sitetosite::SEND);
       }  // for node
     }
-    auto outputPorts = currRpgNode["Output Ports"];
+    auto outputPorts = currRpgNode[schema_.rpg_output_ports];
     if (outputPorts && outputPorts.isSequence()) {
       for (const auto& currPort : outputPorts) {
         logger_->log_debug("Got a current port, iterating...");
 
-        parsePort(currPort, group.get(), sitetosite::RECEIVE);
+        parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
       }  // for node
     }
     parentGroup->addProcessGroup(std::move(group));
@@ -420,10 +425,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
 
   auto reportTask = createProvenanceReportTask();
 
-  checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY);
-  auto schedulingStrategyStr = node["scheduling strategy"].getString().value();
-  checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY);
-  auto schedulingPeriodStr = node["scheduling period"].getString().value();
+  checkRequiredField(node, schema_.scheduling_strategy);
+  auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value();
+  checkRequiredField(node, schema_.scheduling_period);
+  auto schedulingPeriodStr = node[schema_.scheduling_period].getString().value();
 
   if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
     logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
@@ -456,10 +461,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
       logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
     }
   }
-  checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY);
-  auto portUUIDStr = node["port uuid"].getString().value();
-  checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY);
-  auto batchSizeStr = node["batch size"].getString().value();
+  checkRequiredField(node, schema_.provenance_reporting_port_uuid);
+  auto portUUIDStr = node[schema_.provenance_reporting_port_uuid].getString().value();
+  checkRequiredField(node, schema_.provenance_reporting_batch_size);
+  auto batchSizeStr = node[schema_.provenance_reporting_batch_size].getString().value();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
   port_uuid = portUUIDStr;
@@ -481,9 +486,9 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
     return;
   }
   for (const auto& service_node : controller_services_node) {
-    checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY);
+    checkRequiredField(service_node, schema_.name);
 
-    auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY);
+    auto type = getRequiredField(service_node, schema_.type);
     logger_->log_debug("Using type %s for controller service node", type);
 
     std::string fullType = type;
@@ -493,8 +498,8 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
       type = type.substr(lastOfIdx);
     }
 
-    auto name = service_node["name"].getString().value();
-    auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY);
+    auto name = service_node[schema_.name].getString().value();
+    auto id = getRequiredIdField(service_node);
 
     utils::Identifier uuid;
     uuid = id;
@@ -502,11 +507,11 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
     if (nullptr != controller_service_node) {
       logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
       controller_service_node->initialize();
-      if (Node propertiesNode = service_node["Properties"]) {
+      if (Node propertiesNode = service_node[schema_.controller_service_properties]) {
         // we should propagate properties to the node and to the implementation
-        parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY);
+        parsePropertiesNode(propertiesNode, *controller_service_node, name);
         if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
-          parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY);
+          parsePropertiesNode(propertiesNode, *controllerServiceImpl, name);
         }
       }
     } else {
@@ -538,7 +543,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
 
     // Default name to be same as ID
     // If name is specified in configuration, use the value
-    const auto name = connection_node["name"].getString().value_or(id);
+    const auto name = connection_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect connection UUID format.");
@@ -547,7 +552,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
 
     auto connection = createConnection(name, uuid.value());
     logger_->log_debug("Created connection with UUID %s and name %s", id, name);
-    const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+    const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_);
     connectionParser.configureConnectionSourceRelationships(*connection);
     connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
     connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
@@ -561,7 +566,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
   }
 }
 
-void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
+void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
   utils::Identifier uuid;
 
   if (!parent) {
@@ -570,9 +575,9 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   }
 
   // Check for required fields
-  checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
-  auto nameStr = port_node["name"].getString().value();
-  auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY,
+  checkRequiredField(port_node, schema_.name);
+  auto nameStr = port_node[schema_.name].getString().value();
+  auto portId = getRequiredIdField(port_node,
     "The field 'id' is required for "
     "the port named '" + nameStr + "' in the Flow Config. If this port "
     "is an input port for a NiFi Remote Process Group, the port "
@@ -597,8 +602,11 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   // else defaults to RAW
 
   // handle port properties
-  if (Node propertiesNode = port_node["Properties"]) {
-    parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY);
+  if (Node propertiesNode = port_node[schema_.rpg_port_properties]) {
+    parsePropertiesNode(propertiesNode, *port, nameStr);
+  } else {
+    parsePropertyNodeElement(minifi::RemoteProcessorGroupPort::portUUID.getName(), port_node[schema_.rpg_port_target_id], *port);
+    validateComponentProperties(*port, nameStr, port_node.getPath());
   }
 
   // add processor to parent
@@ -606,7 +614,7 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
   parent->addProcessor(std::move(port));
   processor.setScheduledState(core::RUNNING);
 
-  if (auto tasksNode = port_node["max concurrent tasks"]) {
+  if (auto tasksNode = port_node[schema_.max_concurrent_tasks]) {
     std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value();
     int32_t maxConcurrentTasks;
     if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
@@ -658,7 +666,7 @@ PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTy
     } else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) {
       coercedValue = property_value_node.getBool().value();
     } else {
-      coercedValue = property_value_node.getString().value();
+      coercedValue = property_value_node.getScalarAsString().value();
     }
     return coercedValue;
   } catch (const std::exception& e) {
@@ -687,7 +695,7 @@ void StructuredConfiguration::parseSingleProperty(const std::string& property_na
     throw;
   }
   if (!property_set) {
-    const auto rawValueString = property_value_node.getString().value();
+    const auto rawValueString = property_value_node.getScalarAsString().value();
     auto proc = dynamic_cast<core::Connectable*>(&processor);
     if (proc) {
       logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
@@ -714,7 +722,7 @@ void StructuredConfiguration::parsePropertyNodeElement(const std::string& proper
   }
 }
 
-void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) {
+void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name) {
   // Treat generically as a node so we can perform inspection on entries to ensure they are populated
   logger_->log_trace("Entered %s", component_name);
   for (const auto& property_node : properties_node) {
@@ -723,7 +731,7 @@ void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, c
     parsePropertyNodeElement(propertyName, propertyValueNode, component);
   }
 
-  validateComponentProperties(component, component_name, section);
+  validateComponentProperties(component, component_name, properties_node.getPath());
 }
 
 void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) {
@@ -739,7 +747,7 @@ void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup*
     std::string id = getOrGenerateId(funnel_node);
 
     // Default name to be same as ID
-    const auto name = funnel_node["name"].getString().value_or(id);
+    const auto name = funnel_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect funnel UUID format.");
@@ -767,7 +775,7 @@ void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGr
     std::string id = getOrGenerateId(port_node);
 
     // Default name to be same as ID
-    const auto name = port_node["name"].getString().value_or(id);
+    const auto name = port_node[schema_.name].getString().value_or(id);
 
     const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
       logger_->log_debug("Incorrect port UUID format.");
@@ -861,14 +869,14 @@ void StructuredConfiguration::raiseComponentError(const std::string &component_n
   throw std::invalid_argument(err_msg);
 }
 
-std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) {
-  if (node[id_field]) {
-    if (auto opt_id_str = node[id_field].getString()) {
+std::string StructuredConfiguration::getOrGenerateId(const Node& node) {
+  if (node[schema_.identifier]) {
+    if (auto opt_id_str = node[schema_.identifier].getString()) {
       auto id = opt_id_str.value();
       addNewId(id);
       return id;
     }
-    throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string.");
+    throw std::invalid_argument("getOrGenerateId: idField '" + utils::StringUtils::join(",", schema_.identifier) + "' is expected to contain string.");
   }
 
   auto id = id_generator_->generate().to_string();
@@ -876,27 +884,24 @@ std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std
   return id;
 }
 
-std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) {
-  checkRequiredField(node, "id", section, error_message);
-  auto id = node["id"].getString().value();
+std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view error_message) {
+  checkRequiredField(node, schema_.identifier, error_message);
+  auto id = node[schema_.identifier].getString().value();
   addNewId(id);
   return id;
 }
 
-std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section,
-                                               const std::string& info_message) {
+std::string StructuredConfiguration::getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message) {
   std::string infoMessage = info_message;
   auto result = node[field_name];
   if (!result) {
     if (infoMessage.empty()) {
       // Build a helpful info message for the user to inform them that a default is being used
-      infoMessage =
-          node["name"] ?
-              "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" :
-              "Using default value for optional field '" + field_name + "' ";
-      if (!section.empty()) {
-        infoMessage += " [in '" + section + "' section of configuration file]: ";
+      infoMessage = "Using default value for optional field '" + utils::StringUtils::join(",", field_name) + "'";
+      if (auto name = node["name"]) {
+        infoMessage += "' in component named '" + name.getString().value() + "'";
       }
+      infoMessage += " [in '" + node.getPath() + "' section of configuration file]: ";
 
       infoMessage += default_value;
     }
diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
index f39a3ac8b..a6521884b 100644
--- a/libminifi/src/core/flow/StructuredConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -49,10 +49,9 @@ void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Conne
 
 void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const {
   // Configure connection source
-  if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) {
-    addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection);
-  } else if (connectionNode_["source relationship names"]) {
-    auto relList = connectionNode_["source relationship names"];
+  if (connectionNode_[schema_.source_relationship] && !connectionNode_[schema_.source_relationship].getString().value().empty()) {
+    addNewRelationshipToConnection(connectionNode_[schema_.source_relationship].getString().value(), connection);
+  } else if (auto relList = connectionNode_[schema_.source_relationship_list]) {
     if (relList.isSequence() && !relList.empty()) {
       for (const auto &rel : relList) {
         addNewRelationshipToConnection(rel.getString().value(), connection);
@@ -68,7 +67,7 @@ void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::
 }
 
 uint64_t StructuredConnectionParser::getWorkQueueSize() const {
-  if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) {
+  if (auto max_work_queue_data_size_node = connectionNode_[schema_.max_queue_size]) {
     std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_size;
     if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
@@ -81,7 +80,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const {
 }
 
 uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
-  const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+  const flow::Node max_work_queue_data_size_node = connectionNode_[schema_.max_queue_data_size];
   if (max_work_queue_data_size_node) {
     std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
     uint64_t max_work_queue_data_size = 0;
@@ -95,7 +94,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
 }
 
 uint64_t StructuredConnectionParser::getSwapThreshold() const {
-  const flow::Node swap_threshold_node = connectionNode_["swap threshold"];
+  const flow::Node swap_threshold_node = connectionNode_[schema_.swap_threshold];
   if (swap_threshold_node) {
     auto swap_threshold_str = swap_threshold_node.getString().value();
     uint64_t swap_threshold;
@@ -109,7 +108,7 @@ uint64_t StructuredConnectionParser::getSwapThreshold() const {
 }
 
 utils::Identifier StructuredConnectionParser::getSourceUUID() const {
-  const flow::Node source_id_node = connectionNode_["source id"];
+  const flow::Node source_id_node = connectionNode_[schema_.source_id];
   if (source_id_node) {
     const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value());
     if (srcUUID) {
@@ -120,8 +119,8 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
     throw std::invalid_argument("Invalid source id");
   }
   // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
-  checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY);
-  const auto connectionSrcProcName = connectionNode_["source name"].getString().value();
+  checkRequiredField(connectionNode_, schema_.source_name);
+  const auto connectionSrcProcName = connectionNode_[schema_.source_name].getString().value();
   const auto srcUUID = utils::Identifier::parse(connectionSrcProcName);
   if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the source name is a remote port id, so use that as the source id
@@ -141,7 +140,7 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
 }
 
 utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
-  const flow::Node destination_id_node = connectionNode_["destination id"];
+  const flow::Node destination_id_node = connectionNode_[schema_.destination_id];
   if (destination_id_node) {
     const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value());
     if (destUUID) {
@@ -153,8 +152,8 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
   }
   // we use the same logic as above for resolving the source processor
   // for looking up the destination processor in absence of a processor id
-  checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY);
-  auto connectionDestProcName = connectionNode_["destination name"].getString().value();
+  checkRequiredField(connectionNode_, schema_.destination_name);
+  auto connectionDestProcName = connectionNode_[schema_.destination_name].getString().value();
   const auto destUUID = utils::Identifier::parse(connectionDestProcName);
   if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
     // the destination name is a remote port id, so use that as the dest id
@@ -175,7 +174,7 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
 
 std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const {
   using namespace std::literals::chrono_literals;
-  const flow::Node expiration_node = connectionNode_["flowfile expiration"];
+  const flow::Node expiration_node = connectionNode_[schema_.flowfile_expiration];
   if (!expiration_node) {
     logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
     return 0ms;
@@ -196,7 +195,7 @@ std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() co
 }
 
 bool StructuredConnectionParser::getDropEmpty() const {
-  const flow::Node drop_empty_node = connectionNode_["drop empty"];
+  const flow::Node drop_empty_node = connectionNode_[schema_.drop_empty];
   if (drop_empty_node) {
     return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false);
   }
diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp
deleted file mode 100644
index 960f9a631..000000000
--- a/libminifi/src/core/json/JsonConfiguration.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <memory>
-#include <vector>
-#include <set>
-#include <cinttypes>
-#include <variant>
-
-#include "core/json/JsonConfiguration.h"
-#include "core/json/JsonNode.h"
-#include "core/state/Value.h"
-#include "Defaults.h"
-#include "utils/TimeUtil.h"
-
-#include "rapidjson/rapidjson.h"
-#include "rapidjson/document.h"
-
-namespace org::apache::nifi::minifi::core {
-
-namespace {
-
-}  // namespace
-
-
-JsonConfiguration::JsonConfiguration(ConfigurationContext ctx)
-    : StructuredConfiguration(([&] {
-                                if (!ctx.path) {
-                                  ctx.path = DEFAULT_NIFI_CONFIG_JSON;
-                                }
-                                return std::move(ctx);
-                              })(),
-                              logging::LoggerFactory<JsonConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() {
-  if (!config_path_) {
-    logger_->log_error("Cannot instantiate flow, no config file is set.");
-    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
-  }
-  const auto configuration = filesystem_->read(config_path_.value());
-  if (!configuration) {
-    // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
-    return nullptr;
-  }
-  try {
-    rapidjson::Document doc;
-    rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length());
-    if (!res) {
-      throw std::runtime_error("Could not parse json file");
-    }
-    flow::Node root{std::make_shared<JsonNode>(&doc)};
-    return getRootFrom(root);
-  } catch(...) {
-    logger_->log_error("Invalid json configuration file");
-    throw;
-  }
-}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) {
-  try {
-    rapidjson::Document doc;
-    rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length());
-    if (!res) {
-      throw std::runtime_error("Could not parse json file");
-    }
-    flow::Node root{std::make_shared<JsonNode>(&doc)};
-    return getRootFrom(root);
-  } catch (const std::runtime_error& err) {
-    logger_->log_error(err.what());
-    throw;
-  }
-}
-
-}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 89092eee5..f7bde6a66 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -40,31 +40,11 @@ YamlConfiguration::YamlConfiguration(ConfigurationContext ctx)
         })(),
         logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
 
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() {
-  if (!config_path_) {
-    logger_->log_error("Cannot instantiate flow, no config file is set.");
-    throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
-  }
-  const auto configuration = filesystem_->read(config_path_.value());
-  if (!configuration) {
-  // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
-  return nullptr;
-  }
-  try {
-    YAML::Node rootYamlNode = YAML::Load(configuration.value());
-    flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
-  } catch(...) {
-    logger_->log_error("Invalid yaml configuration file");
-    throw;
-  }
-}
-
 std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) {
   try {
     YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
     flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
+    return getRootFrom(root, flow::FlowSchema::getDefault());
   } catch (const YAML::ParserException &pe) {
     logger_->log_error(pe.what());
     throw;
@@ -75,7 +55,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const
   try {
     YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
     flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
-    return getRootFrom(root);
+    return getRootFrom(root, flow::FlowSchema::getDefault());
   } catch (const YAML::ParserException &pe) {
     logger_->log_error(pe.what());
     throw;
diff --git a/libminifi/test/ConfigurationTestController.h b/libminifi/test/ConfigurationTestController.h
new file mode 100644
index 000000000..05e33214d
--- /dev/null
+++ b/libminifi/test/ConfigurationTestController.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "TestBase.h"
+#include "core/FlowConfiguration.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+class ConfigurationTestController : public TestController {
+ public:
+  ConfigurationTestController() {
+    flow_file_repo_ = core::createRepository("flowfilerepository");
+    configuration_ = std::make_shared<minifi::Configure>();
+    stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+    content_repo_ = std::make_shared<core::repository::VolatileContentRepository>();
+
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setTrace<core::YamlConfiguration>();
+    LogTestController::getInstance().setDebug<core::Processor>();
+    LogTestController::getInstance().setTrace<core::flow::AdaptiveConfiguration>();
+  }
+
+  core::ConfigurationContext getContext() const {
+    return core::ConfigurationContext{
+        .flow_file_repo = flow_file_repo_,
+        .content_repo = content_repo_,
+        .stream_factory = stream_factory_,
+        .configuration = configuration_
+    };
+  }
+
+  std::shared_ptr<core::Repository> flow_file_repo_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+  std::shared_ptr<core::ContentRepository> content_repo_;
+};
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 63ee396b2..a3ed6a483 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -232,7 +232,7 @@ int main(int argc, char **argv) {
     std::string graceful_shutdown_seconds;
     std::string prov_repo_class = "provenancerepository";
     std::string flow_repo_class = "flowfilerepository";
-    std::string nifi_configuration_class_name = "yamlconfiguration";
+    std::string nifi_configuration_class_name = "adaptiveconfiguration";
     std::string content_repo_class = "filesystemrepository";
 
     auto log_properties = std::make_shared<core::logging::LoggerProperties>();


[nifi-minifi-cpp] 02/04: MINIFICPP-2034 Cache SID lookups in CWEL

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 5a9c1c33407e6a54b8de828b4620adeac659d814
Author: Ferenc Gerlits <fg...@gmail.com>
AuthorDate: Fri Feb 17 01:11:26 2023 +0100

    MINIFICPP-2034 Cache SID lookups in CWEL
    
    The SID -> Username lookup is a system call, which can be quite slow. As
    this mapping is unlikely to change without a Windows (and so MiNiFi)
    restart, we should cache it.
    
    Caching is quite simple: there is a hard-coded 24 hour cache expiry, the
    cache is in memory only, and the only way to clear it is to restart
    MiNiFi. We can improve this later if a user asks for it.
    
    Closes #1502
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  1 +
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 25 ++++++-
 .../windows-event-log/ConsumeWindowsEventLog.h     |  7 +-
 extensions/windows-event-log/tests/CMakeLists.txt  |  6 +-
 .../tests/ConsumeWindowsEventLogTests.cpp          |  3 +-
 .../windows-event-log/tests/LookupCacherTests.cpp  | 77 ++++++++++++++++++++++
 .../tests/MetadataWalkerTests.cpp                  | 24 ++-----
 extensions/windows-event-log/wel/LookupCacher.cpp  | 37 +++++++++++
 extensions/windows-event-log/wel/LookupCacher.h    | 48 ++++++++++++++
 .../windows-event-log/wel/MetadataWalker.cpp       |  5 +-
 extensions/windows-event-log/wel/MetadataWalker.h  | 10 +--
 11 files changed, 211 insertions(+), 32 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2e91293ee..a1878df3e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -407,6 +407,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 | Batch Commit Size                  | 1000                                                                                                                                                                                                                        |                                  | Maximum number of Events to consume and create to Flow Files from before committing.                                                                                                                 [...]
 | **Process Old Events**             | false                                                                                                                                                                                                                       | true<br>false                    | This property defines if old events (which are created before first time server is started) should be processed.                                                                                     [...]
 | State Directory                    | CWELState                                                                                                                                                                                                                   |                                  | DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.                                                                                               [...]
+| Cache SID Lookups                  | true                                                                                                                                                                                                                        | true<br>false                    | Determines whether SID to name lookups are cached in memory                                                                                                                                          [...]
 
 ### Relationships
 
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index fa48bf2ac..c1e93688a 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -33,6 +33,7 @@
 #include <regex>
 #include <cinttypes>
 
+#include "wel/LookupCacher.h"
 #include "wel/MetadataWalker.h"
 #include "wel/XMLString.h"
 #include "wel/UnicodeConversion.h"
@@ -173,6 +174,13 @@ const core::Property ConsumeWindowsEventLog::ProcessOldEvents(
   withDescription("This property defines if old events (which are created before first time server is started) should be processed.")->
   build());
 
+const core::Property ConsumeWindowsEventLog::CacheSidLookups(
+    core::PropertyBuilder::createProperty("Cache SID Lookups")->
+        isRequired(false)->
+        withDefaultValue<bool>(true)->
+        withDescription("Determines whether SID to name lookups are cached in memory")->
+        build());
+
 const core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
 
 ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, const utils::Identifier& uuid)
@@ -324,6 +332,9 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
   context->getProperty(MaxBufferSize.getName(), max_buffer_size_);
   logger_->log_debug("ConsumeWindowsEventLog: MaxBufferSize %" PRIu64, max_buffer_size_);
 
+  context->getProperty(CacheSidLookups.getName(), cache_sid_lookups_);
+  logger_->log_debug("ConsumeWindowsEventLog: will%s cache SID to name lookups", cache_sid_lookups_ ? "" : " not");
+
   provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
   logger_->log_trace("Successfully configured CWEL");
 }
@@ -488,7 +499,7 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
       for (size_t numberPos = 0; std::string::npos != (numberPos = nodeText.find(percentages, numberPos));) {
         numberPos += percentages.size();
 
-        uint64_t number{};
+        DWORD number{};
         try {
           // Assumption - first character is not '0', otherwise not all digits will be replaced by 'value'.
           number = std::stoul(&nodeText[numberPos]);
@@ -593,7 +604,7 @@ nonstd::expected<EventRender, std::string> ConsumeWindowsEventLog::createEventRe
   // this is a well known path.
   std::string provider_name = doc.child("Event").child("System").child("Provider").attribute("Name").value();
   wel::WindowsEventLogMetadataImpl metadata{getEventLogHandler(provider_name).getMetadata(), hEvent};
-  wel::MetadataWalker walker{metadata, channel_, !resolve_as_attributes_, apply_identifier_function_, regex_ ? &*regex_ : nullptr};
+  wel::MetadataWalker walker{metadata, channel_, !resolve_as_attributes_, apply_identifier_function_, regex_ ? &*regex_ : nullptr, userIdToUsernameFunction()};
 
   // resolve the event metadata
   doc.traverse(walker);
@@ -752,6 +763,16 @@ void ConsumeWindowsEventLog::LogWindowsError(const std::string& error) const {
   LocalFree(lpMsg);
 }
 
+std::function<std::string(const std::string&)> ConsumeWindowsEventLog::userIdToUsernameFunction() const {
+  static constexpr auto lookup = &utils::OsUtils::userIdToUsername;
+  if (cache_sid_lookups_) {
+    static auto cached_lookup = wel::LookupCacher{lookup};
+    return std::ref(cached_lookup);
+  } else {
+    return lookup;
+  }
+}
+
 REGISTER_RESOURCE(ConsumeWindowsEventLog, Processor);
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 258846b1e..5a3a5b908 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -79,6 +79,7 @@ class ConsumeWindowsEventLog : public core::Processor {
   EXTENSIONAPI static const core::Property BatchCommitSize;
   EXTENSIONAPI static const core::Property BookmarkRootDirectory;
   EXTENSIONAPI static const core::Property ProcessOldEvents;
+  EXTENSIONAPI static const core::Property CacheSidLookups;
   static auto properties() {
     return std::array{
         Channel,
@@ -94,7 +95,8 @@ class ConsumeWindowsEventLog : public core::Processor {
         JSONFormat,
         BatchCommitSize,
         BookmarkRootDirectory,
-        ProcessOldEvents
+        ProcessOldEvents,
+        CacheSidLookups
     };
   }
 
@@ -121,6 +123,7 @@ class ConsumeWindowsEventLog : public core::Processor {
   void LogWindowsError(const std::string& error = "Error") const;
   nonstd::expected<EventRender, std::string> createEventRender(EVT_HANDLE eventHandle);
   void substituteXMLPercentageItems(pugi::xml_document& doc);
+  std::function<std::string(const std::string&)> userIdToUsernameFunction() const;
 
   nonstd::expected<std::string, std::string> renderEventAsXml(EVT_HANDLE event_handle);
 
@@ -132,7 +135,6 @@ class ConsumeWindowsEventLog : public core::Processor {
   static constexpr const char* JSONSimple = "Simple";
   static constexpr const char* JSONFlattened = "Flattened";
 
- private:
   struct TimeDiff {
     auto operator()() const {
       return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
@@ -163,6 +165,7 @@ class ConsumeWindowsEventLog : public core::Processor {
   uint64_t max_buffer_size_{};
   std::map<std::string, wel::WindowsEventLogHandler> providers_;
   uint64_t batch_commit_size_{};
+  bool cache_sid_lookups_ = true;
 
   SMART_ENUM(JSONType,
       (None, "None"),
diff --git a/extensions/windows-event-log/tests/CMakeLists.txt b/extensions/windows-event-log/tests/CMakeLists.txt
index b9fe53737..a88a3c212 100644
--- a/extensions/windows-event-log/tests/CMakeLists.txt
+++ b/extensions/windows-event-log/tests/CMakeLists.txt
@@ -17,17 +17,17 @@
 # under the License.
 #
 
-set(WEL_INTEGRATION_TESTS  "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "MetadataWalkerTests.cpp")
+set(WEL_TESTS  "BookmarkTests.cpp" "ConsumeWindowsEventLogTests.cpp" "LookupCacherTests.cpp" "MetadataWalkerTests.cpp")
 if (TEST_CUSTOM_WEL_PROVIDER)
     execute_process(COMMAND
         "${CMAKE_CURRENT_LIST_DIR}/custom-provider/generate-and-register.bat"
         "${CMAKE_CURRENT_LIST_DIR}/custom-provider"
     )
-    list(APPEND WEL_INTEGRATION_TESTS "CWELCustomProviderTests.cpp")
+    list(APPEND WEL_TESTS "CWELCustomProviderTests.cpp")
 endif()
 
 SET(WEL_TEST_COUNT 0)
-FOREACH(testfile ${WEL_INTEGRATION_TESTS})
+FOREACH(testfile ${WEL_TESTS})
     get_filename_component(testfilename "${testfile}" NAME_WE)
     add_executable("${testfilename}" "${testfile}")
     target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/windows-event-log/")
diff --git a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
index c5f9d052d..44a2f125f 100644
--- a/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
+++ b/extensions/windows-event-log/tests/ConsumeWindowsEventLogTests.cpp
@@ -97,7 +97,8 @@ TEST_CASE("ConsumeWindowsEventLog properties work with default values", "[create
     ConsumeWindowsEventLog::OutputFormat,
     ConsumeWindowsEventLog::BatchCommitSize,
     ConsumeWindowsEventLog::BookmarkRootDirectory,  // TODO(fgerlits): obsolete, see definition; remove in a later release
-    ConsumeWindowsEventLog::ProcessOldEvents
+    ConsumeWindowsEventLog::ProcessOldEvents,
+    ConsumeWindowsEventLog::CacheSidLookups
   };
   for (const core::Property& property : properties_required_or_with_default_value) {
     if (!LogTestController::getInstance().contains("property name " + property.getName() + " value ")) {
diff --git a/extensions/windows-event-log/tests/LookupCacherTests.cpp b/extensions/windows-event-log/tests/LookupCacherTests.cpp
new file mode 100644
index 000000000..1bcd41058
--- /dev/null
+++ b/extensions/windows-event-log/tests/LookupCacherTests.cpp
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+
+#include "Catch.h"
+#include "wel/LookupCacher.h"
+
+namespace wel = org::apache::nifi::minifi::wel;
+
+namespace {
+struct DoubleTheInput {
+  std::string operator()(const std::string& key) {
+    keys_queried.push_back(key);
+    return key + key;
+  }
+
+  std::vector<std::string> keys_queried;
+};
+}
+
+TEST_CASE("LookupCacher can do lookups") {
+  DoubleTheInput lookup;
+  wel::LookupCacher lookup_cacher{std::ref(lookup)};
+
+  CHECK(lookup_cacher("foo") == "foofoo");
+  CHECK(lookup_cacher("bar") == "barbar");
+  CHECK(lookup_cacher("baa") == "baabaa");
+  CHECK(lookup.keys_queried == std::vector<std::string>{"foo", "bar", "baa"});
+}
+
+TEST_CASE("LookupCacher caches the lookups") {
+  DoubleTheInput lookup;
+  wel::LookupCacher lookup_cacher{std::ref(lookup)};
+  CHECK(lookup.keys_queried.empty());
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 1);
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 1);
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 1);
+}
+
+TEST_CASE("LookupCacher lookups can expire") {
+  using namespace std::literals::chrono_literals;
+  DoubleTheInput lookup;
+  wel::LookupCacher lookup_cacher{std::ref(lookup), 10ms};
+  CHECK(lookup.keys_queried.empty());
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 1);
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 1);
+
+  std::this_thread::sleep_for(20ms);
+
+  lookup_cacher("foo");
+  CHECK(lookup.keys_queried.size() == 2);
+}
diff --git a/extensions/windows-event-log/tests/MetadataWalkerTests.cpp b/extensions/windows-event-log/tests/MetadataWalkerTests.cpp
index ee0339cfe..4dec8a544 100644
--- a/extensions/windows-event-log/tests/MetadataWalkerTests.cpp
+++ b/extensions/windows-event-log/tests/MetadataWalkerTests.cpp
@@ -22,6 +22,7 @@
 #include "TestBase.h"
 #include "Catch.h"
 #include "core/Core.h"
+#include "utils/OsUtils.h"
 #include "wel/MetadataWalker.h"
 #include "wel/XMLString.h"
 #include "pugixml.hpp"
@@ -36,7 +37,7 @@ namespace {
 
 std::string updateXmlMetadata(const std::string &xml, EVT_HANDLE metadata_ptr, EVT_HANDLE event_ptr, bool update_xml, bool resolve, utils::Regex const* regex = nullptr) {
   WindowsEventLogMetadataImpl metadata{metadata_ptr, event_ptr};
-  MetadataWalker walker(metadata, "", update_xml, resolve, regex);
+  MetadataWalker walker(metadata, "", update_xml, resolve, regex, &utils::OsUtils::userIdToUsername);
 
   pugi::xml_document doc;
   pugi::xml_parse_result result = doc.load_string(xml.c_str());
@@ -118,24 +119,13 @@ TEST_CASE("MetadataWalker will leave a Sid unchanged if it doesn't correspond to
 TEST_CASE("MetadataWalker can replace multiple Sids", "[updateXmlMetadata]") {
   std::string xml = readFile("resources/multiplesids.xml");
 
-  std::string programmaticallyResolved;
-
   pugi::xml_document doc;
   xml = updateXmlMetadata(xml, nullptr, nullptr, false, true);
   pugi::xml_parse_result result = doc.load_string(xml.c_str());
+  REQUIRE(result);
 
-  for (const auto &node : doc.child("Event").child("EventData").children()) {
-    auto name = node.attribute("Name").as_string();
-    if (utils::StringUtils::equalsIgnoreCase("GroupMembership", name)) {
-      programmaticallyResolved = node.text().get();
-      break;
-    }
-  }
-
-  std::string expected = "Nobody Everyone Null Authority";
-
-  // we are only testing mulitiple sid resolutions, not the resolution of other items.
-  REQUIRE(expected == programmaticallyResolved);
+  // we are only testing multiple sid resolutions, not the resolution of other items.
+  CHECK(std::string_view("Nobody Everyone Null Authority") == doc.select_node("Event/EventData/Data[@Name='GroupMembership']").node().text().get());
 }
 
 namespace {
@@ -150,10 +140,10 @@ void extractMappingsTestHelper(const std::string &file_name,
   REQUIRE(!input_xml.empty());
   pugi::xml_document doc;
   pugi::xml_parse_result result = doc.load_string(input_xml.c_str());
-  CHECK(result);
+  REQUIRE(result);
 
   auto regex = utils::Regex(".*Sid");
-  MetadataWalker walker(FakeWindowsEventLogMetadata{}, METADATA_WALKER_TESTS_LOG_NAME, update_xml, resolve, &regex);
+  MetadataWalker walker(FakeWindowsEventLogMetadata{}, METADATA_WALKER_TESTS_LOG_NAME, update_xml, resolve, &regex, &utils::OsUtils::userIdToUsername);
   doc.traverse(walker);
 
   CHECK(walker.getIdentifiers() == expected_identifiers);
diff --git a/extensions/windows-event-log/wel/LookupCacher.cpp b/extensions/windows-event-log/wel/LookupCacher.cpp
new file mode 100644
index 000000000..fdb240a8a
--- /dev/null
+++ b/extensions/windows-event-log/wel/LookupCacher.cpp
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "LookupCacher.h"
+
+namespace org::apache::nifi::minifi::wel {
+
+std::string LookupCacher::operator()(const std::string& key) {
+  {
+    std::lock_guard<std::mutex> lock{mutex_};
+    const auto it = cache_.find(key);
+    if (it != cache_.end() && it->second.expiry > std::chrono::system_clock::now()) {
+      return it->second.value;
+    }
+  }
+
+  std::string value = lookup_function_(key);
+
+  std::lock_guard<std::mutex> lock{mutex_};
+  cache_.insert_or_assign(key, CacheItem{value, std::chrono::system_clock::now() + lifetime_});
+  return value;
+}
+
+}  // namespace org::apache::nifi::minifi::wel
diff --git a/extensions/windows-event-log/wel/LookupCacher.h b/extensions/windows-event-log/wel/LookupCacher.h
new file mode 100644
index 000000000..7122c865c
--- /dev/null
+++ b/extensions/windows-event-log/wel/LookupCacher.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+namespace org::apache::nifi::minifi::wel {
+
+class LookupCacher {
+ public:
+  explicit LookupCacher(std::function<std::string(const std::string&)> lookup_function, std::chrono::milliseconds lifetime = std::chrono::hours{24})
+    : lookup_function_(std::move(lookup_function)),
+      lifetime_(lifetime) {}
+  std::string operator()(const std::string& key);
+
+ private:
+  struct CacheItem {
+    std::string value;
+    std::chrono::system_clock::time_point expiry;
+  };
+
+  std::mutex mutex_;
+  std::function<std::string(const std::string&)> lookup_function_;
+  std::chrono::milliseconds lifetime_;
+  std::unordered_map<std::string, CacheItem> cache_;
+};
+
+}  // namespace org::apache::nifi::minifi::wel
diff --git a/extensions/windows-event-log/wel/MetadataWalker.cpp b/extensions/windows-event-log/wel/MetadataWalker.cpp
index 8b5ca7c5e..2181db17c 100644
--- a/extensions/windows-event-log/wel/MetadataWalker.cpp
+++ b/extensions/windows-event-log/wel/MetadataWalker.cpp
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-#include <windows.h>
 #include <strsafe.h>
 
 #include <map>
@@ -39,7 +38,7 @@ bool MetadataWalker::for_each(pugi::xml_node &node) {
     for (pugi::xml_attribute attr : node.attributes())  {
       const auto idUpdate = [&](const std::string &input) {
         if (resolve_) {
-          auto resolved = utils::OsUtils::userIdToUsername(input);
+          auto resolved = user_id_to_username_fn_(input);
           replaced_identifiers_[input] = resolved;
           return resolved;
         }
@@ -61,7 +60,7 @@ bool MetadataWalker::for_each(pugi::xml_node &node) {
       std::string nodeText = node.text().get();
       std::vector<std::string> ids = getIdentifiers(nodeText);
       for (const auto &id : ids) {
-        auto  resolved = utils::OsUtils::userIdToUsername(id);
+        auto  resolved = user_id_to_username_fn_(id);
         std::string replacement = "%{" + id + "}";
         replaced_identifiers_[id] = resolved;
         replaced_identifiers_[replacement] = resolved;
diff --git a/extensions/windows-event-log/wel/MetadataWalker.h b/extensions/windows-event-log/wel/MetadataWalker.h
index 014dadbdb..a2d949338 100644
--- a/extensions/windows-event-log/wel/MetadataWalker.h
+++ b/extensions/windows-event-log/wel/MetadataWalker.h
@@ -23,7 +23,7 @@
 #include <Windows.h>
 #include <winevt.h>
 #include <codecvt>
-
+#include <functional>
 #include <map>
 #include <sstream>
 #include <string>
@@ -34,7 +34,6 @@
 #include "core/Core.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "utils/OsUtils.h"
 #include "FlowFileRecord.h"
 #include "WindowsEventLog.h"
 
@@ -50,12 +49,14 @@ namespace org::apache::nifi::minifi::wel {
  */
 class MetadataWalker : public pugi::xml_tree_walker {
  public:
-  MetadataWalker(const WindowsEventLogMetadata& windows_event_log_metadata, std::string log_name, bool update_xml, bool resolve, utils::Regex const* regex)
+  MetadataWalker(const WindowsEventLogMetadata& windows_event_log_metadata, std::string log_name, bool update_xml, bool resolve, utils::Regex const* regex,
+      std::function<std::string(std::string)> user_id_to_username_fn)
       : windows_event_log_metadata_(windows_event_log_metadata),
         log_name_(std::move(log_name)),
         regex_(regex),
         update_xml_(update_xml),
-        resolve_(resolve) {
+        resolve_(resolve),
+        user_id_to_username_fn_(std::move(user_id_to_username_fn)) {
   }
 
   /**
@@ -93,6 +94,7 @@ class MetadataWalker : public pugi::xml_tree_walker {
   utils::Regex const * const regex_;
   const bool update_xml_;
   const bool resolve_;
+  std::function<std::string(const std::string&)> user_id_to_username_fn_;
   std::map<std::string, std::string> metadata_;
   std::map<std::string, std::string> fields_values_;
   std::map<std::string, std::string> replaced_identifiers_;


[nifi-minifi-cpp] 04/04: MINIFICPP-2007 Add rocksdb compression options

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2c7f989aea41d801ba22507b703664cae9dac1cc
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Fri Feb 17 01:16:13 2023 +0100

    MINIFICPP-2007 Add rocksdb compression options
    
    - Add bundled zstd and lz4 thirdparty libraries
    - Upgrade rocksdb to version 7.7.3
    - Add compression options bzip2, zlib, zstd and lz4 on Unix and xpress
      on Windows
    
    Closes #1480
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 CMakeLists.txt                                     |   6 ++
 CONFIGURE.md                                       |   9 ++
 LICENSE                                            | 102 ++++++++++++++++-----
 NOTICE                                             |   2 +
 cmake/BundledRocksDB.cmake                         |  35 ++++++-
 cmake/BundledZLIB.cmake                            |   1 -
 cmake/LZ4.cmake                                    |  53 +++++++++++
 cmake/Zstd.cmake                                   |  56 +++++++++++
 cmake/lz4/dummy/Findlz4.cmake                      |  33 +++++++
 cmake/zstd/dummy/Findzstd.cmake                    |  33 +++++++
 conf/minifi.properties                             |   2 +
 extensions/libarchive/CMakeLists.txt               |   6 --
 .../rocksdb-repos/DatabaseContentRepository.cpp    |   5 +-
 extensions/rocksdb-repos/FlowFileRepository.cpp    |   5 +-
 extensions/rocksdb-repos/database/RocksDbUtils.cpp |  54 +++++++++++
 extensions/rocksdb-repos/database/RocksDbUtils.h   |  21 ++---
 .../rocksdb-repos/database/StringAppender.cpp      |   2 +-
 .../standard-processors/tests/unit/PutTCPTests.cpp |   4 +-
 libminifi/include/properties/Configuration.h       |   2 +
 libminifi/src/Configuration.cpp                    |   2 +
 ...e_gcc_clang_compiler_options_from_windows.patch |  37 ++++++++
 21 files changed, 420 insertions(+), 50 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 513afe7ae..3eac599b1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -214,6 +214,12 @@ else()
     message(VERBOSE "No custom malloc implementation")
 endif()
 
+if (NOT DISABLE_BZIP2 AND (NOT DISABLE_LIBARCHIVE OR (NOT DISABLE_ROCKSDB AND NOT WIN32)))
+    include(BundledBZip2)
+    use_bundled_bzip2(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
+    list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/bzip2/dummy")
+endif()
+
 if(NOT WIN32)
     if (ENABLE_JNI)
         set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_JNI")
diff --git a/CONFIGURE.md b/CONFIGURE.md
index 304407ecb..c9beff2c1 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -162,6 +162,15 @@ folder. You may specify your own path in place of these defaults.
     nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
     nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 
+### Configuring compression for rocksdb database
+
+Rocksdb has an option to set compression type for its database to use less disk space.
+If content repository or flow file repository is set to use the rocksdb database as their storage, then we have the option to compress those repositories. On Unix operating systems `zlib`, `bzip2`, `zstd`, `lz4` and `lz4hc` compression types and on Windows `xpress` compression type is supported by MiNiFi C++. If the property is set to `auto` then `xpress` will be used on Windows, `zstd` on Unix operating systems. These options can be set in the minifi.properies file with the following pr [...]
+
+     in minifi.properties
+     nifi.flowfile.repository.rocksdb.compression=zlib
+     nifi.content.repository.rocksdb.compression=auto
+
 #### Shared database
 
 It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme.
diff --git a/LICENSE b/LICENSE
index 823ac8688..e2697b9d3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3407,32 +3407,90 @@ For these and/or other purposes and motivations, and without any expectation of
     Affirmer understands and acknowledges that Creative Commons is not a party to this document and has no duty or obligation with respect to this CC0 or use of the Work.
 ```
 
---------------------------------------------------------------------------
-
 This product bundles 'prometheus-cpp' which is available under an MIT license.
 
-  MIT License
+    MIT License
 
-  Copyright (c) 2016-2021 Jupp Mueller
-  Copyright (c) 2017-2022 Gregor Jasny
+    Copyright (c) 2016-2021 Jupp Mueller
+    Copyright (c) 2017-2022 Gregor Jasny
 
-  And many contributors, see
-  https://github.com/jupp0r/prometheus-cpp/graphs/contributors
+    And many contributors, see
+    https://github.com/jupp0r/prometheus-cpp/graphs/contributors
 
-  Permission is hereby granted, free of charge, to any person obtaining a copy
-  of this software and associated documentation files (the "Software"), to deal
-  in the Software without restriction, including without limitation the rights
-  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-  copies of the Software, and to permit persons to whom the Software is
-  furnished to do so, subject to the following conditions:
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"), to deal
+    in the Software without restriction, including without limitation the rights
+    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+    copies of the Software, and to permit persons to whom the Software is
+    furnished to do so, subject to the following conditions:
 
-  The above copyright notice and this permission notice shall be included in all
-  copies or substantial portions of the Software.
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
 
-  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-  SOFTWARE.
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE.
+
+This product bundles 'Zstandard' which is  available under a BSD License.
+
+    BSD License
+
+    For Zstandard software
+
+    Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without modification,
+    are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    * Neither the name Facebook nor the names of its contributors may be used to
+      endorse or promote products derived from this software without specific
+      prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+    ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+    ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+This product bundles 'LZ4 Library' which is  available under a BSD 2-Clause license.
+
+    LZ4 Library
+    Copyright (c) 2011-2020, Yann Collet
+    All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without modification,
+    are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice, this
+      list of conditions and the following disclaimer in the documentation and/or
+      other materials provided with the distribution.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+    ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+    ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/NOTICE b/NOTICE
index 7502e6c9e..60476b637 100644
--- a/NOTICE
+++ b/NOTICE
@@ -68,6 +68,8 @@ This software includes third party software subject to the following copyrights:
 - abseil-cpp - Google Inc.
 - crc32c - Google Inc., Fangming Fang, Vadim Skipin, Rodrigo Tobar, Harry Mallon
 - prometheus-cpp - Copyright (c) 2016-2021 Jupp Mueller, Copyright (c) 2017-2022 Gregor Jasny
+- Zstandard - Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+- LZ4 Library - Copyright (c) 2011-2020, Yann Collet
 
 The licenses for these third party components are included in LICENSE.txt
 
diff --git a/cmake/BundledRocksDB.cmake b/cmake/BundledRocksDB.cmake
index 989204ebc..b44855ee1 100644
--- a/cmake/BundledRocksDB.cmake
+++ b/cmake/BundledRocksDB.cmake
@@ -18,6 +18,14 @@
 function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
     message("Using bundled RocksDB")
 
+    if (NOT WIN32)
+        include(Zstd)
+        list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/zstd/dummy")
+
+        include(LZ4)
+        list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/lz4/dummy")
+    endif()
+
     # Define byproducts
     if (WIN32)
         set(BYPRODUCT "lib/rocksdb.lib")
@@ -43,18 +51,29 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
         list(APPEND ROCKSDB_CMAKE_ARGS -DPORTABLE=ON)
     endif()
     if(WIN32)
-        list(APPEND ROCKSDB_CMAKE_ARGS -DROCKSDB_INSTALL_ON_WINDOWS=ON)
+        list(APPEND ROCKSDB_CMAKE_ARGS
+                -DROCKSDB_INSTALL_ON_WINDOWS=ON
+                -DWITH_XPRESS=ON)
+    else()
+        list(APPEND ROCKSDB_CMAKE_ARGS
+                -DWITH_ZLIB=ON
+                -DWITH_BZ2=ON
+                -DWITH_ZSTD=ON
+                -DWITH_LZ4=ON)
     endif()
 
+    append_third_party_passthrough_args(ROCKSDB_CMAKE_ARGS "${ROCKSDB_CMAKE_ARGS}")
+
     # Build project
     ExternalProject_Add(
             rocksdb-external
-            URL "https://github.com/facebook/rocksdb/archive/refs/tags/v6.29.5.tar.gz"
-            URL_HASH "SHA256=ddbf84791f0980c0bbce3902feb93a2c7006f6f53bfd798926143e31d4d756f0"
+            URL "https://github.com/facebook/rocksdb/archive/refs/tags/v7.7.3.tar.gz"
+            URL_HASH "SHA256=b8ac9784a342b2e314c821f6d701148912215666ac5e9bdbccd93cf3767cb611"
             SOURCE_DIR "${BINARY_DIR}/thirdparty/rocksdb-src"
             CMAKE_ARGS ${ROCKSDB_CMAKE_ARGS}
             BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/rocksdb-install/${BYPRODUCT}"
             EXCLUDE_FROM_ALL TRUE
+            LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
     )
 
     # Set variables
@@ -66,11 +85,17 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR)
     # Create imported targets
     add_library(RocksDB::RocksDB STATIC IMPORTED)
     set_target_properties(RocksDB::RocksDB PROPERTIES IMPORTED_LOCATION "${ROCKSDB_LIBRARY}")
+    if (NOT WIN32)
+        add_dependencies(rocksdb-external ZLIB::ZLIB BZip2::BZip2 zstd::zstd lz4::lz4)
+    endif()
     add_dependencies(RocksDB::RocksDB rocksdb-external)
     file(MAKE_DIRECTORY ${ROCKSDB_INCLUDE_DIR})
-    set_property(TARGET RocksDB::RocksDB APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${ROCKSDB_INCLUDE_DIR})
+    target_include_directories(RocksDB::RocksDB INTERFACE ${ROCKSDB_INCLUDE_DIR})
     set_property(TARGET RocksDB::RocksDB APPEND PROPERTY INTERFACE_LINK_LIBRARIES Threads::Threads)
+    target_link_libraries(RocksDB::RocksDB INTERFACE Threads::Threads)
     if(WIN32)
-        set_property(TARGET RocksDB::RocksDB APPEND PROPERTY INTERFACE_LINK_LIBRARIES Rpcrt4.lib)
+        target_link_libraries(RocksDB::RocksDB INTERFACE Rpcrt4.lib Cabinet.lib)
+    else()
+        target_link_libraries(RocksDB::RocksDB INTERFACE ZLIB::ZLIB BZip2::BZip2 zstd::zstd lz4::lz4)
     endif()
 endfunction(use_bundled_rocksdb)
diff --git a/cmake/BundledZLIB.cmake b/cmake/BundledZLIB.cmake
index 8d2d3542f..16b77c907 100644
--- a/cmake/BundledZLIB.cmake
+++ b/cmake/BundledZLIB.cmake
@@ -69,6 +69,5 @@ function(use_bundled_zlib SOURCE_DIR BINARY_DIR)
     add_library(ZLIB::ZLIB STATIC IMPORTED)
     set_target_properties(ZLIB::ZLIB PROPERTIES IMPORTED_LOCATION "${ZLIB_LIBRARIES}")
     add_dependencies(ZLIB::ZLIB zlib-external)
-    file(MAKE_DIRECTORY ${ZLIB_INCLUDE_DIRS})
     set_property(TARGET ZLIB::ZLIB APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${ZLIB_INCLUDE_DIRS})
 endfunction(use_bundled_zlib)
diff --git a/cmake/LZ4.cmake b/cmake/LZ4.cmake
new file mode 100644
index 000000000..40260740b
--- /dev/null
+++ b/cmake/LZ4.cmake
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+set(LZ4_BUILD_CLI OFF CACHE BOOL "" FORCE)
+set(LZ4_BUILD_LEGACY_LZ4C OFF CACHE BOOL "" FORCE)
+set(BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE)
+set(BUILD_STATIC_LIBS ON CACHE BOOL "" FORCE)
+
+FetchContent_Declare(lz4
+    URL            https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz
+    URL_HASH       SHA256=0b0e3aa07c8c063ddf40b082bdf7e37a1562bda40a0ff5272957f3e987e0e54b
+)
+
+# With CMake >= 3.18, this block could be replaced with FetchContent_MakeAvailable(lz4),
+# if we add the `SOURCE_SUBDIR build/cmake` option to FetchContent_Declare() [this option is not available in CMake < 3.18].
+# As of July 2022, one of our supported platforms, Centos 7, comes with CMake 3.17.
+FetchContent_GetProperties(lz4)
+if(NOT lz4_POPULATED)
+    FetchContent_Populate(lz4)
+    # the top level doesn't contain CMakeLists.txt, it is in the "build/cmake" subdirectory
+    add_subdirectory(${lz4_SOURCE_DIR}/build/cmake ${lz4_BINARY_DIR})
+endif()
+
+add_library(lz4::lz4 ALIAS lz4_static)
+
+# Set variables
+set(LZ4_FOUND "YES" CACHE STRING "" FORCE)
+set(LZ4_INCLUDE_DIRS "${lz4_SOURCE_DIR}/lib" CACHE STRING "" FORCE)
+if (WIN32)
+    set(LZ4_LIBRARIES "${lz4_BINARY_DIR}/lib/${CMAKE_BUILD_TYPE}/lz4_static.lib" CACHE STRING "" FORCE)
+else()
+    set(LZ4_LIBRARIES "${lz4_BINARY_DIR}/liblz4.a" CACHE STRING "" FORCE)
+endif()
+
+# Set exported variables for FindPackage.cmake
+set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LZ4_INCLUDE_DIRS=${LZ4_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_LZ4_LIBRARIES=${LZ4_LIBRARIES}" CACHE STRING "" FORCE)
diff --git a/cmake/Zstd.cmake b/cmake/Zstd.cmake
new file mode 100644
index 000000000..7d8a2313a
--- /dev/null
+++ b/cmake/Zstd.cmake
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+set(ZSTD_BUILD_SHARED OFF CACHE BOOL "" FORCE)
+
+if (WIN32)
+    set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/zstd/exclude_gcc_clang_compiler_options_from_windows.patch")
+    set(PC "${Patch_EXECUTABLE}" -p1 -i "${PATCH_FILE}")
+endif()
+
+FetchContent_Declare(zstd
+    URL            https://github.com/facebook/zstd/archive/refs/tags/v1.5.2.tar.gz
+    URL_HASH       SHA256=f7de13462f7a82c29ab865820149e778cbfe01087b3a55b5332707abf9db4a6e
+    PATCH_COMMAND "${PC}"
+)
+
+# With CMake >= 3.18, this block could be replaced with FetchContent_MakeAvailable(zstd),
+# if we add the `SOURCE_SUBDIR build/cmake` option to FetchContent_Declare() [this option is not available in CMake < 3.18].
+# As of July 2022, one of our supported platforms, Centos 7, comes with CMake 3.17.
+FetchContent_GetProperties(zstd)
+if(NOT zstd_POPULATED)
+    FetchContent_Populate(zstd)
+    # the top level doesn't contain CMakeLists.txt, it is in the "build/cmake" subdirectory
+    add_subdirectory(${zstd_SOURCE_DIR}/build/cmake ${zstd_BINARY_DIR})
+endif()
+
+add_library(zstd::zstd ALIAS libzstd_static)
+
+# Set variables
+set(ZSTD_FOUND "YES" CACHE STRING "" FORCE)
+set(ZSTD_INCLUDE_DIRS "${zstd_SOURCE_DIR}/lib" CACHE STRING "" FORCE)
+if (WIN32)
+    set(ZSTD_LIBRARIES "${zstd_BINARY_DIR}/lib/${CMAKE_BUILD_TYPE}/zstd_static.lib" CACHE STRING "" FORCE)
+else()
+    set(ZSTD_LIBRARIES "${zstd_BINARY_DIR}/lib/libzstd.a" CACHE STRING "" FORCE)
+endif()
+
+# Set exported variables for FindPackage.cmake
+set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_ZSTD_INCLUDE_DIRS=${ZSTD_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+set(PASSTHROUGH_VARIABLES ${PASSTHROUGH_VARIABLES} "-DEXPORTED_ZSTD_LIBRARIES=${ZSTD_LIBRARIES}" CACHE STRING "" FORCE)
diff --git a/cmake/lz4/dummy/Findlz4.cmake b/cmake/lz4/dummy/Findlz4.cmake
new file mode 100644
index 000000000..b5ab2e66e
--- /dev/null
+++ b/cmake/lz4/dummy/Findlz4.cmake
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Dummy lz4 find for when we use bundled version
+if(NOT LZ4_FOUND)
+    set(LZ4_FOUND "YES" CACHE STRING "" FORCE)
+    set(LZ4_INCLUDE_DIR "${EXPORTED_LZ4_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(LZ4_INCLUDE_DIRS "${EXPORTED_LZ4_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(LZ4_LIBRARIES "${EXPORTED_LZ4_LIBRARIES}" CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET lz4::lz4)
+    add_library(lz4::lz4 STATIC IMPORTED)
+    set_target_properties(lz4::lz4 PROPERTIES
+            INTERFACE_INCLUDE_DIRECTORIES "${LZ4_INCLUDE_DIRS}")
+    set_target_properties(lz4::lz4 PROPERTIES
+            IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+            IMPORTED_LOCATION "${LZ4_LIBRARIES}")
+endif()
diff --git a/cmake/zstd/dummy/Findzstd.cmake b/cmake/zstd/dummy/Findzstd.cmake
new file mode 100644
index 000000000..4d857e32c
--- /dev/null
+++ b/cmake/zstd/dummy/Findzstd.cmake
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Dummy zstd find for when we use bundled version
+if(NOT zstd_FOUND)
+    set(zstd_FOUND "YES" CACHE STRING "" FORCE)
+    set(ZSTD_INCLUDE_DIR "${EXPORTED_ZSTD_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(ZSTD_INCLUDE_DIRS "${EXPORTED_ZSTD_INCLUDE_DIRS}" CACHE STRING "" FORCE)
+    set(ZSTD_LIBRARIES "${EXPORTED_ZSTD_LIBRARIES}" CACHE STRING "" FORCE)
+endif()
+
+if(NOT TARGET zstd::zstd)
+    add_library(zstd::zstd STATIC IMPORTED)
+    set_target_properties(zstd::zstd PROPERTIES
+            INTERFACE_INCLUDE_DIRECTORIES "${ZSTD_INCLUDE_DIRS}")
+    set_target_properties(zstd::zstd PROPERTIES
+            IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+            IMPORTED_LOCATION "${ZSTD_LIBRARIES}")
+endif()
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 491a94016..856de8fb1 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -28,9 +28,11 @@ nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repositor
 nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
 nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
+# nifi.flowfile.repository.rocksdb.compression=auto
 nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 nifi.provenance.repository.class.name=NoOpRepository
 nifi.content.repository.class.name=DatabaseContentRepository
+# nifi.content.repository.rocksdb.compression=auto
 
 #nifi.remote.input.secure=true
 #nifi.security.need.ClientAuth=
diff --git a/extensions/libarchive/CMakeLists.txt b/extensions/libarchive/CMakeLists.txt
index 674306dc7..2b89d3ea7 100644
--- a/extensions/libarchive/CMakeLists.txt
+++ b/extensions/libarchive/CMakeLists.txt
@@ -27,12 +27,6 @@ if (NOT DISABLE_LZMA)
     list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/liblzma/dummy")
 endif()
 
-if (NOT DISABLE_BZIP2)
-    include(BundledBZip2)
-    use_bundled_bzip2(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
-    list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/bzip2/dummy")
-endif()
-
 include(BundledLibArchive)
 use_bundled_libarchive(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
 
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 881de1085..9beeb2498 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -53,10 +53,13 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
       db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
     }
   };
-  auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts){
+  auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) {
     cf_opts.OptimizeForPointLookup(4);
     cf_opts.merge_operator = std::make_shared<StringAppender>();
     cf_opts.max_successive_merges = 0;
+    if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) {
+      cf_opts.compression = *compression_type;
+    }
   };
   db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_);
   if (db_->open()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index ecb9a1bcc..8637b4282 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -220,11 +220,14 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   // To avoid DB write issues during heavy load it's recommended to have high number of buffer.
   // Rocksdb's stall feature can also trigger in case the number of buffers is >= 3.
   // The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load.
-  auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+  auto cf_options = [&configure] (rocksdb::ColumnFamilyOptions& cf_opts) {
     cf_opts.OptimizeForPointLookup(4);
     cf_opts.write_buffer_size = 8ULL << 20U;
     cf_opts.max_write_buffer_number = 20;
     cf_opts.min_write_buffer_number_to_merge = 1;
+    if (auto compression_type = minifi::internal::readConfiguredCompressionType(configure, Configure::nifi_flow_repository_rocksdb_compression)) {
+      cf_opts.compression = *compression_type;
+    }
   };
   db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_);
   if (db_->open()) {
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.cpp b/extensions/rocksdb-repos/database/RocksDbUtils.cpp
new file mode 100644
index 000000000..d3834dfef
--- /dev/null
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.cpp
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RocksDbUtils.h"
+
+#include <string>
+
+#include "Exception.h"
+
+namespace org::apache::nifi::minifi::internal {
+
+std::optional<rocksdb::CompressionType> readConfiguredCompressionType(const std::shared_ptr<Configure> &configuration, const std::string& config_key) {
+  std::string value;
+  if (!configuration->get(config_key, value) || value.empty()) {
+    return std::nullopt;
+  }
+#ifdef WIN32
+  if (value == "auto" || value == "xpress") {
+    return rocksdb::CompressionType::kXpressCompression;
+  } else {
+    throw Exception(REPOSITORY_EXCEPTION, "RocksDB compression type not supported: " + value);
+  }
+#else
+  if (value == "zlib") {
+    return rocksdb::CompressionType::kZlibCompression;
+  } else if (value == "bzip2") {
+    return rocksdb::CompressionType::kBZip2Compression;
+  } else if (value == "auto" || value == "zstd") {
+    return rocksdb::CompressionType::kZSTD;
+  } else if (value == "lz4") {
+    return rocksdb::CompressionType::kLZ4Compression;
+  } else if (value == "lz4hc") {
+    return rocksdb::CompressionType::kLZ4HCCompression;
+  } else {
+    throw Exception(REPOSITORY_EXCEPTION, "RocksDB compression type not supported: " + value);
+  }
+#endif
+}
+
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h
index d42afce6a..b5748c117 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
 #include <functional>
 #include <algorithm>
+#include <optional>
+#include <memory>
+#include <string>
+
 #include "rocksdb/db.h"
+#include "rocksdb/options.h"
 #include "utils/GeneralUtils.h"
+#include "properties/Configure.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 enum class RocksDbMode {
   ReadOnly,
@@ -69,8 +70,6 @@ class Writable {
 using DBOptionsPatch = std::function<void(Writable<rocksdb::DBOptions>&)>;
 using ColumnFamilyOptionsPatch = std::function<void(rocksdb::ColumnFamilyOptions&)>;
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+std::optional<rocksdb::CompressionType> readConfiguredCompressionType(const std::shared_ptr<Configure> &configuration, const std::string& config_key);
+
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/StringAppender.cpp b/extensions/rocksdb-repos/database/StringAppender.cpp
index 4580dcdbc..7f46e5943 100644
--- a/extensions/rocksdb-repos/database/StringAppender.cpp
+++ b/extensions/rocksdb-repos/database/StringAppender.cpp
@@ -39,7 +39,7 @@ bool StringAppender::Merge(const rocksdb::Slice& /*key*/, const rocksdb::Slice*
   return true;
 }
 
-static auto string_appender_registrar = rocksdb::ObjectLibrary::Default()->Register<StringAppender>(
+static auto string_appender_registrar = rocksdb::ObjectLibrary::Default()->AddFactory<StringAppender>(
     "StringAppender",
     [] (const std::string& /* uri */, std::unique_ptr<StringAppender>* out, std::string* /* errmsg */) {
       *out = std::make_unique<StringAppender>();
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index ac44c4db8..524c451b2 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -202,8 +202,8 @@ class PutTCPTestFixture {
     put_tcp_->setProperty(PutTCP::Port, utils::StringUtils::join_pack("${literal('", std::to_string(port), "')}"));
   }
 
-  void setPutTCPPort(std::string port_str) {
-    put_tcp_->setProperty(PutTCP::Port, std::move(port_str));
+  void setPutTCPPort(const std::string& port_str) {
+    put_tcp_->setProperty(PutTCP::Port, port_str);
   }
 
   [[nodiscard]] uint16_t getSinglePort() const {
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 25e6b5ebc..978600bb3 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -51,7 +51,9 @@ class Configuration : public Properties {
   static constexpr const char *nifi_server_name = "nifi.server.name";
   static constexpr const char *nifi_configuration_class_name = "nifi.flow.configuration.class.name";
   static constexpr const char *nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name";
+  static constexpr const char *nifi_flow_repository_rocksdb_compression = "nifi.flowfile.repository.rocksdb.compression";
   static constexpr const char *nifi_content_repository_class_name = "nifi.content.repository.class.name";
+  static constexpr const char *nifi_content_repository_rocksdb_compression = "nifi.content.repository.rocksdb.compression";
   static constexpr const char *nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name";
   static constexpr const char *nifi_volatile_repository_options_flowfile_max_count = "nifi.volatile.repository.options.flowfile.max.count";
   static constexpr const char *nifi_volatile_repository_options_flowfile_max_bytes = "nifi.volatile.repository.options.flowfile.max.bytes";
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 78ae790f7..c584cb574 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -36,7 +36,9 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_server_name},
   core::ConfigurationProperty{Configuration::nifi_configuration_class_name},
   core::ConfigurationProperty{Configuration::nifi_flow_repository_class_name},
+  core::ConfigurationProperty{Configuration::nifi_flow_repository_rocksdb_compression},
   core::ConfigurationProperty{Configuration::nifi_content_repository_class_name},
+  core::ConfigurationProperty{Configuration::nifi_content_repository_rocksdb_compression},
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_class_name},
   core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_flowfile_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_flowfile_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())},
diff --git a/thirdparty/zstd/exclude_gcc_clang_compiler_options_from_windows.patch b/thirdparty/zstd/exclude_gcc_clang_compiler_options_from_windows.patch
new file mode 100644
index 000000000..c32f78f4f
--- /dev/null
+++ b/thirdparty/zstd/exclude_gcc_clang_compiler_options_from_windows.patch
@@ -0,0 +1,37 @@
+diff --git a/build/cmake/tests/CMakeLists.txt b/build/cmake/tests/CMakeLists.txt
+index 8bba6ea6..ce84ed8c 100644
+--- a/build/cmake/tests/CMakeLists.txt
++++ b/build/cmake/tests/CMakeLists.txt
+@@ -57,7 +57,9 @@ target_link_libraries(datagen libzstd_static)
+ # fullbench
+ #
+ add_executable(fullbench ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${TESTS_DIR}/fullbench.c)
+-set_property(TARGET fullbench APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++if (NOT WIN32)
++    set_property(TARGET fullbench APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++endif()
+ target_link_libraries(fullbench libzstd_static)
+ add_test(NAME fullbench COMMAND fullbench ${ZSTD_FULLBENCH_FLAGS})
+ 
+@@ -65,7 +67,9 @@ add_test(NAME fullbench COMMAND fullbench ${ZSTD_FULLBENCH_FLAGS})
+ # fuzzer
+ #
+ add_executable(fuzzer ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${TESTS_DIR}/fuzzer.c)
+-set_property(TARGET fuzzer APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++if (NOT WIN32)
++    set_property(TARGET fuzzer APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++endif()
+ target_link_libraries(fuzzer libzstd_static)
+ AddTestFlagsOption(ZSTD_FUZZER_FLAGS "$ENV{FUZZERTEST} $ENV{FUZZER_FLAGS}"
+     "Semicolon-separated list of flags to pass to the fuzzer test (see `fuzzer -h` for usage)")
+@@ -78,7 +82,9 @@ add_test(NAME fuzzer COMMAND fuzzer ${ZSTD_FUZZER_FLAGS})
+ # zstreamtest
+ #
+ add_executable(zstreamtest ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${TESTS_DIR}/seqgen.c ${TESTS_DIR}/zstreamtest.c)
+-set_property(TARGET zstreamtest APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++if (NOT WIN32)
++    set_property(TARGET zstreamtest APPEND PROPERTY COMPILE_OPTIONS "-Wno-deprecated-declarations")
++endif()
+ target_link_libraries(zstreamtest libzstd_static)
+ AddTestFlagsOption(ZSTD_ZSTREAM_FLAGS "$ENV{ZSTREAM_TESTTIME} $ENV{FUZZER_FLAGS}"
+     "Semicolon-separated list of flags to pass to the zstreamtest test (see `zstreamtest -h` for usage)")


[nifi-minifi-cpp] 03/04: MINIFICPP-2045 Synchronous flow file reloading

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7e8c5c49c0987bf3e2dcf1e0e16599c5cd7e69ec
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:14:07 2023 +0100

    MINIFICPP-2045 Synchronous flow file reloading
    
    ... and orphan content cleanup at startup
    
    Closes #1509
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 CONFIGURE.md                                       |   1 -
 conf/minifi.properties                             |   1 -
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  33 ++++++
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    | 116 ++++----------------
 extensions/rocksdb-repos/FlowFileRepository.h      |  12 ---
 libminifi/include/core/ContentRepository.h         |   2 +
 .../include/core/repository/FileSystemRepository.h |   2 +
 .../core/repository/VolatileContentRepository.h    |   4 +
 libminifi/include/properties/Configuration.h       |   1 -
 libminifi/src/Configuration.cpp                    |   1 -
 .../src/core/repository/FileSystemRepository.cpp   |  13 +++
 libminifi/test/flow-tests/SessionTests.cpp         |   8 +-
 .../test/persistence-tests/PersistenceTests.cpp    |  10 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  39 +++++++
 libminifi/test/rocksdb-tests/EncryptionTests.cpp   |   9 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         | 120 ++++++++++++++++-----
 libminifi/test/unit/FileSystemRepositoryTests.cpp  |  24 +++++
 18 files changed, 242 insertions(+), 156 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index a56504f96..304407ecb 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -160,7 +160,6 @@ folder. You may specify your own path in place of these defaults.
     in minifi.properties
     nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
     nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-    nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
     nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 
 #### Shared database
diff --git a/conf/minifi.properties b/conf/minifi.properties
index dca8e39fb..491a94016 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -28,7 +28,6 @@ nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repositor
 nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
 nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
 nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 nifi.provenance.repository.class.name=NoOpRepository
 nifi.content.repository.class.name=DatabaseContentRepository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 0924534f0..881de1085 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "RocksDbStream.h"
@@ -179,6 +180,38 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R
   return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
 }
 
+void DatabaseContentRepository::clearOrphans() {
+  if (!is_valid_ || !db_) {
+    logger_->log_error("Cannot delete orphan content entries, repository is invalid");
+    return;
+  }
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_error("Cannot delete orphan content entries, could not open repository");
+    return;
+  }
+  std::vector<std::string> keys_to_be_deleted;
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    auto key = it->key().ToString();
+    auto claim_it = count_map_.find(key);
+    if (claim_it == count_map_.end() || claim_it->second == 0) {
+      logger_->log_error("Deleting orphan resource %s", key);
+      keys_to_be_deleted.push_back(key);
+    }
+  }
+  auto batch = opendb->createWriteBatch();
+  for (auto& key : keys_to_be_deleted) {
+    batch.Delete(key);
+  }
+
+  rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch);
+
+  if (!status.ok()) {
+    logger_->log_error("Could not delete orphan contents from rocksdb database: %s", status.ToString());
+  }
+}
+
 REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 9961d2607..98f3acb79 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -67,6 +67,8 @@ class DatabaseContentRepository : public core::ContentRepository {
   bool remove(const minifi::ResourceClaim &claim) override;
   bool exists(const minifi::ResourceClaim &streamId) override;
 
+  void clearOrphans() override;
+
  private:
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 690a797a8..ecb9a1bcc 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -111,9 +111,6 @@ void FlowFileRepository::printStats() {
 
 void FlowFileRepository::run() {
   auto last = std::chrono::steady_clock::now();
-  if (isRunning()) {
-    prune_stored_flowfiles();
-  }
   while (isRunning()) {
     std::this_thread::sleep_for(purge_period_);
     flush();
@@ -126,38 +123,29 @@ void FlowFileRepository::run() {
   flush();
 }
 
-void FlowFileRepository::prune_stored_flowfiles() {
-  const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME});
-  logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? "encrypted" : "plaintext");
-
-  auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
-    if (encrypted_env) {
-      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
-    } else {
-      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
-    }
-  };
-  auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly);
-  std::optional<minifi::internal::OpenRocksDb> opendb;
-  if (nullptr != checkpoint_) {
-    opendb = checkpointDB->open();
-    if (opendb) {
-      logger_->log_trace("Successfully opened checkpoint database at '%s'", checkpoint_dir_.string());
-    } else {
-      logger_->log_error("Couldn't open checkpoint database at '%s' using live database", checkpoint_dir_.string());
-      opendb = db_->open();
-    }
-    if (!opendb) {
-      logger_->log_trace("Could not open neither the checkpoint nor the live database.");
-      return;
+bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
+  constexpr int RETRY_COUNT = 3;
+  std::chrono::milliseconds wait_time = 0ms;
+  for (int i=0; i < RETRY_COUNT; ++i) {
+    auto status = operation();
+    if (status.ok()) {
+      logger_->log_trace("Rocksdb operation executed successfully");
+      return true;
     }
-  } else {
-    logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error.");
+    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
+    wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+void FlowFileRepository::initialize_repository() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_trace("Couldn't open database to load existing flow files");
     return;
   }
+  logger_->log_info("Reading existing flow files from database");
 
   auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -191,62 +179,8 @@ void FlowFileRepository::prune_stored_flowfiles() {
       keys_to_delete.enqueue(key);
     }
   }
-}
-
-bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
-  std::chrono::milliseconds waitTime = 0ms;
-  for (int i=0; i < 3; ++i) {
-    auto status = operation();
-    if (status.ok()) {
-      logger_->log_trace("Rocksdb operation executed successfully");
-      return true;
-    }
-    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
-    waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
-    std::this_thread::sleep_for(waitTime);
-  }
-  return false;
-}
-
-/**
- * Returns True if there is data to interrogate.
- * @return true if our db has data stored.
- */
-bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& opendb) {
-  auto it = opendb.NewIterator(rocksdb::ReadOptions());
-  it->SeekToFirst();
-  return it->Valid();
-}
-void FlowFileRepository::initialize_repository() {
-  checkpoint_.reset();
-  auto opendb = db_->open();
-  if (!opendb) {
-    logger_->log_trace("Couldn't open database, no way to checkpoint");
-    return;
-  }
-  // first we need to establish a checkpoint iff it is needed.
-  if (!need_checkpoint(*opendb)) {
-    logger_->log_trace("Do not need checkpoint");
-    return;
-  }
-  // delete any previous copy
-  if (utils::file::delete_dir(checkpoint_dir_) < 0) {
-    logger_->log_error("Could not delete existing checkpoint directory '%s'", checkpoint_dir_.string());
-    return;
-  }
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint;
-  rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not create checkpoint object: %s", checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string());
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not initialize checkpoint: %s", checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_ = std::move(checkpoint);
-  logger_->log_trace("Created checkpoint in directory '%s'", checkpoint_dir_.string());
+  flush();
+  content_repo_->clearOrphans();
 }
 
 void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
@@ -266,12 +200,6 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   }
   logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
 
-  value.clear();
-  if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) {
-    checkpoint_dir_ = value;
-  }
-  logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string());
-
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index aeda5af9c..832fde642 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -66,13 +66,11 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
   }
 
   explicit FlowFileRepository(const std::string& repo_name = "",
-                              std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
                               std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
                               std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
                               int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
                               std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
     : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
-      checkpoint_dir_(std::move(checkpoint_dir)),
       logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
   }
 
@@ -112,23 +110,13 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
 
   void initialize_repository();
 
-  /**
-   * Returns true if a checkpoint is needed at startup
-   * @return true if a checkpoint is needed.
-   */
-  static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
-
-  void prune_stored_flowfiles();
-
   std::thread& getThread() override {
     return thread_;
   }
 
-  std::filesystem::path checkpoint_dir_;
   moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
   std::unique_ptr<FlowFileLoader> swap_loader_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 84ac4eded..7d92634fb 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -51,6 +51,8 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
   void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
   StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) override;
 
+  virtual void clearOrphans() = 0;
+
  protected:
   std::string directory_;
   std::mutex count_map_mutex_;
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 66cec39ef..01926f5bd 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -49,6 +49,8 @@ class FileSystemRepository : public core::ContentRepository {
   bool remove(const minifi::ResourceClaim& claim) override;
   std::shared_ptr<ContentSession> createSession() override;
 
+  void clearOrphans() override;
+
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 54c1dcc6f..62e9fd80b 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -94,6 +94,10 @@ class VolatileContentRepository : public core::ContentRepository {
    */
   bool remove(const minifi::ResourceClaim &claim) override;
 
+  void clearOrphans() override {
+    // there are no persisted orphans to delete
+  }
+
  private:
   VolatileRepositoryData repo_data_;
   bool minimize_locking_;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index eb2f598dc..25e6b5ebc 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -66,7 +66,6 @@ class Configuration : public Properties {
   static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
   static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
   static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
-  static constexpr const char *nifi_flowfile_checkpoint_directory_default = "nifi.flowfile.checkpoint.directory.default";
   static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
   static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
   static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 69b24e9e8..78ae790f7 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -51,7 +51,6 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default},
-  core::ConfigurationProperty{Configuration::nifi_flowfile_checkpoint_directory_default},
   core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index b41cbcfaf..034e9c36a 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -59,4 +59,17 @@ std::shared_ptr<ContentSession> FileSystemRepository::createSession() {
   return std::make_shared<ForwardingContentSession>(sharedFromThis());
 }
 
+void FileSystemRepository::clearOrphans() {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) {
+    auto path = directory_ +  "/" + filename.string();
+    auto it = count_map_.find(path);
+    if (it == count_map_.end() || it->second == 0) {
+      logger_->log_debug("Deleting orphan resource %s", path);
+      std::remove(path.c_str());
+    }
+    return true;
+  }, logger_, false);
+}
+
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
index 59cd8e26b..b722e93b1 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -46,12 +46,6 @@ class TestProcessor : public minifi::core::Processor {
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 };
 
-#ifdef WIN32
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = ".\\sessiontest_flowfile_checkpoint";
-#else
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = "./sessiontest_flowfile_checkpoint";
-#endif
-
 TEST_CASE("Import null data") {
   TestController testController;
   LogTestController::getInstance().setDebug<core::ContentRepository>();
@@ -69,7 +63,7 @@ TEST_CASE("Import null data") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = core::createRepository("nooprepository");
-  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", SESSIONTEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
     testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 530aa06c6..e417ae417 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -53,12 +53,6 @@ class TestProcessor : public minifi::core::Processor {
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 };
 
-#ifdef WIN32
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = ".\\persistencetest_flowfile_checkpoint";
-#else
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = "./persistencetest_flowfile_checkpoint";
-#endif
-
 struct TestFlow{
   TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
         const std::function<std::unique_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
@@ -179,7 +173,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -286,7 +280,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
     testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 0a50cd129..942f1544e 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -242,3 +242,42 @@ TEST_CASE("ProcessSession::append should append to the flowfile and set its size
 TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (RocksDB)", "[zerolengthread]") {
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
 }
+
+size_t getDbSize(const std::filesystem::path& dir) {
+  auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string());
+  auto opendb = db->open();
+  REQUIRE(opendb);
+
+  size_t count = 0;
+  auto it = opendb->NewIterator({});
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    ++count;
+  }
+  return count;
+}
+
+TEST_CASE("DBContentRepository can clear orphan entries") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+  {
+    auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(getDbSize(dir) == 1);
+
+  {
+    auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+    content_repo->clearOrphans();
+  }
+
+  REQUIRE(getDbSize(dir) == 0);
+}
diff --git a/libminifi/test/rocksdb-tests/EncryptionTests.cpp b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
index 2fff68980..b544d7016 100644
--- a/libminifi/test/rocksdb-tests/EncryptionTests.cpp
+++ b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
@@ -33,7 +33,6 @@ class FFRepoFixture : public TestController {
     LogTestController::getInstance().setTrace<FlowFileRepository>();
     home_ = createTempDirectory();
     repo_dir_ = home_ / "flowfile_repo";
-    checkpoint_dir_ = home_ / "checkpoint_dir";
     config_ = std::make_shared<minifi::Configure>();
     config_->setHome(home_);
     container_ = std::make_unique<minifi::Connection>(nullptr, nullptr, "container");
@@ -50,7 +49,7 @@ class FFRepoFixture : public TestController {
 
   template<typename Fn>
   void runWithNewRepository(Fn&& fn) {
-    auto repository = std::make_shared<FlowFileRepository>("ff", checkpoint_dir_, repo_dir_.string());
+    auto repository = std::make_shared<FlowFileRepository>("ff", repo_dir_.string());
     repository->initialize(config_);
     std::map<std::string, core::Connectable*> container_map;
     container_map[container_->getUUIDStr()] = container_.get();
@@ -65,7 +64,6 @@ class FFRepoFixture : public TestController {
   std::unique_ptr<minifi::Connection> container_;
   std::filesystem::path home_;
   std::filesystem::path repo_dir_;
-  std::filesystem::path checkpoint_dir_;
   std::shared_ptr<minifi::Configure> config_;
   std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
 };
@@ -93,14 +91,11 @@ TEST_CASE_METHOD(FFRepoFixture, "FlowFileRepository creates checkpoint and loads
   REQUIRE(container_->isEmpty());
 
   runWithNewRepository([&] (const std::shared_ptr<core::repository::FlowFileRepository>& /*repo*/) {
-    // wait for the flowfiles to be loaded from the checkpoint
+    // wait for the flowfiles to be loaded
     bool success = utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] {
       return !container_->isEmpty();
     });
     REQUIRE(success);
-    REQUIRE(utils::verifyLogLinePresenceInPollTime(
-        std::chrono::seconds{5},
-        "Successfully opened checkpoint database at '" + checkpoint_dir_.string() + "'"));
     std::set<std::shared_ptr<core::FlowFile>> expired;
     auto flowfile = container_->poll(expired);
     REQUIRE(expired.empty());
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index e280c0432..ef4d073bf 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -39,12 +39,6 @@ using namespace std::literals::chrono_literals;
 
 namespace {
 
-#ifdef WIN32
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = ".\\repotest_flowfile_checkpoint";
-#else
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = "./repotest_flowfile_checkpoint";
-#endif
-
 namespace {
 class TestProcessor : public minifi::core::Processor {
  public:
@@ -72,7 +66,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<minifi::Configure>());
 
@@ -83,8 +77,6 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
 
   REQUIRE(true == file->Persist(repository));
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   repository->stop();
 }
 
@@ -94,7 +86,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<minifi::Configure>());
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
@@ -106,8 +98,6 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
 
   REQUIRE(true == file->Persist(repository));
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   repository->stop();
 }
 
@@ -117,7 +107,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
 
@@ -154,8 +144,6 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
 
   REQUIRE(record2->getAttribute("keyB", value));
   REQUIRE(value.empty());
-
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
 }
 
 TEST_CASE("Test Delete Content ", "[TestFFR4]") {
@@ -167,7 +155,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -203,14 +191,11 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
   std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
   REQUIRE(!fileopen.good());
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   LogTestController::getInstance().reset();
 }
 
 TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   TestController testController;
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
 
   LogTestController::getInstance().setDebug<core::ContentRepository>();
   LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
@@ -219,7 +204,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -264,8 +249,6 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
   REQUIRE(fileopen.fail());
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   LogTestController::getInstance().reset();
 }
 
@@ -284,7 +267,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", REPOTEST_FLOWFILE_CHECKPOINT_DIR);
+  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -361,7 +344,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
   class TestFlowFileRepository: public core::repository::FlowFileRepository{
    public:
     explicit TestFlowFileRepository(const std::string& name)
-      : FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+      : FlowFileRepository(name, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
                            10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
 
     void flush() override {
@@ -438,4 +421,93 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
   }
 }
 
+TEST_CASE("FlowFileRepository triggers content repo orphan clear") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  TestController testController;
+  auto ff_dir = testController.createTempDirectory();
+  auto content_dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+  {
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).size() == 1);
+
+  auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+  REQUIRE(ff_repo->initialize(config));
+  auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  REQUIRE(content_repo->initialize(config));
+
+  ff_repo->loadComponent(content_repo);
+
+  REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).empty());
+}
+
+TEST_CASE("FlowFileRepository synchronously pushes existing flow files") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  TestController testController;
+  auto ff_dir = testController.createTempDirectory();
+  auto content_dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+
+  utils::Identifier ff_id;
+  auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
+
+  {
+    auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+    REQUIRE(ff_repo->initialize(config));
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+    auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
+
+    std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>> flow_data;
+    auto ff = std::make_shared<minifi::FlowFileRecord>();
+    ff_id = ff->getUUID();
+    ff->setConnection(conn.get());
+    content_repo->write(*claim)->write("hello");
+    ff->setResourceClaim(claim);
+    auto stream = std::make_unique<minifi::io::BufferStream>();
+    ff->Serialize(*stream);
+    flow_data.emplace_back(ff->getUUIDStr(), std::move(stream));
+
+    REQUIRE(ff_repo->MultiPut(flow_data));
+  }
+
+  {
+    auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+    REQUIRE(ff_repo->initialize(config));
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+    ff_repo->setConnectionMap({{connection_id.to_string(), conn.get()}});
+    ff_repo->loadComponent(content_repo);
+
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    std::shared_ptr<core::FlowFile> ff = conn->poll(expired);
+    REQUIRE(expired.empty());
+    REQUIRE(ff);
+    REQUIRE(ff->getUUID() == ff_id);
+  }
+}
+
 }  // namespace
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp
index b49fe60c5..c61579dd9 100644
--- a/libminifi/test/unit/FileSystemRepositoryTests.cpp
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -58,3 +58,27 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
       return end_memory < start_memory + int64_t{5_MB};
     }, 100ms));
 }
+
+TEST_CASE("FileSystemRepository can clear orphan entries") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+  {
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
+
+  auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+  content_repo->clearOrphans();
+
+  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty());
+}