You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/05/11 15:20:59 UTC

[nifi-minifi-cpp] branch main updated (849aae6f8 -> 4a98b0f62)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 849aae6f8 Bump commons-io in /extensions/sftp/tests/tools/sftp-test-server
     new 6e573f737 MINIFICPP-1687 Signal error on UUID collision
     new 9cc12971c MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP
     new 740fcc773 MINIFICPP-1765 Instantiate kind cluster only once in a test run
     new 4a98b0f62 MINIFICPP-1823 Fix absolute.path output attribute in ListFile

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 PROCESSORS.md                                      |   1 +
 .../integration/MiNiFi_integration_test_driver.py  |  15 +-
 docker/test/integration/environment.py             |  26 ++-
 .../features/file_system_operations.feature        |  15 +-
 .../test/integration/features/kubernetes.feature   |   1 +
 .../integration/minifi/core/DockerTestCluster.py   |   4 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |  37 ++--
 .../core/{KindProxy.py => KubernetesProxy.py}      |  29 ++-
 .../minifi/core/MinifiAsPodInKubernetesCluster.py  |  19 +-
 .../integration/minifi/core/OutputEventHandler.py  |   3 -
 .../minifi/core/SingleNodeDockerCluster.py         |   7 +-
 .../{ExecutePythonProcessor.py => FetchFile.py}    |   7 +-
 .../{GenerateFlowFile.py => ListFile.py}           |   6 +-
 docker/test/integration/steps/steps.py             |   4 +-
 extensions/http-curl/client/HTTPClient.cpp         |  33 ++++
 extensions/http-curl/client/HTTPClient.h           |   4 +
 extensions/http-curl/processors/InvokeHTTP.cpp     | 211 +++++++++++----------
 extensions/http-curl/processors/InvokeHTTP.h       |  76 ++------
 .../http-curl/tests/unit/HTTPClientTests.cpp       |  18 ++
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |  92 ++++++++-
 .../standard-processors/processors/ListFile.cpp    |  45 ++---
 .../standard-processors/processors/ListFile.h      |   1 +
 .../tests/unit/ListFileTests.cpp                   |  15 +-
 .../tests/unit/YamlConfigurationTests.cpp          |  64 +++++++
 libminifi/include/core/yaml/CheckRequiredField.h   |   4 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |   4 +
 libminifi/src/core/yaml/CheckRequiredField.cpp     |  12 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp      |  69 ++++---
 libminifi/test/resources/TestHTTPSiteToSite.yml    |   8 +-
 29 files changed, 537 insertions(+), 293 deletions(-)
 rename docker/test/integration/minifi/core/{KindProxy.py => KubernetesProxy.py} (81%)
 copy docker/test/integration/minifi/processors/{ExecutePythonProcessor.py => FetchFile.py} (85%)
 copy docker/test/integration/minifi/processors/{GenerateFlowFile.py => ListFile.py} (89%)


[nifi-minifi-cpp] 03/04: MINIFICPP-1765 Instantiate kind cluster only once in a test run

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 740fcc7739643fb50e74e19a8d0255224367b938
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:17:49 2022 +0200

    MINIFICPP-1765 Instantiate kind cluster only once in a test run
    
    Closes #1322
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../integration/MiNiFi_integration_test_driver.py  | 15 +++------
 docker/test/integration/environment.py             | 26 ++++++++++++++-
 .../test/integration/features/kubernetes.feature   |  1 +
 .../integration/minifi/core/DockerTestCluster.py   |  4 +--
 .../minifi/core/DockerTestDirectoryBindings.py     | 37 ++++++++++++++--------
 .../core/{KindProxy.py => KubernetesProxy.py}      | 29 ++++++++++++-----
 .../minifi/core/MinifiAsPodInKubernetesCluster.py  | 19 ++++-------
 .../integration/minifi/core/OutputEventHandler.py  |  3 --
 .../minifi/core/SingleNodeDockerCluster.py         |  7 ++--
 docker/test/integration/steps/steps.py             |  4 ++-
 10 files changed, 92 insertions(+), 53 deletions(-)

diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 363ade89b..f450bf751 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -23,7 +23,6 @@ from pydoc import locate
 from minifi.core.InputPort import InputPort
 
 from minifi.core.DockerTestCluster import DockerTestCluster
-from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
 
 from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
 from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
@@ -37,18 +36,17 @@ from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutput
 from minifi.core.utils import decode_escaped_str
 
 
-class MiNiFi_integration_test():
-    def __init__(self, image_store):
-        self.test_id = str(uuid.uuid4())
-        self.cluster = DockerTestCluster(image_store)
+class MiNiFi_integration_test:
+    def __init__(self, context):
+        self.test_id = context.test_id
+        self.cluster = DockerTestCluster(context)
 
         self.connectable_nodes = []
         # Remote process groups are not connectables
         self.remote_process_groups = []
         self.file_system_observer = None
 
-        self.docker_directory_bindings = DockerTestDirectoryBindings()
-        self.docker_directory_bindings.create_new_data_directories(self.test_id)
+        self.docker_directory_bindings = context.directory_bindings
         self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id), self.docker_directory_bindings.get_data_directories(self.test_id))
 
     def __del__(self):
@@ -57,9 +55,6 @@ class MiNiFi_integration_test():
     def cleanup(self):
         self.cluster.cleanup()
 
-    def docker_path_to_local_path(self, docker_path):
-        return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
-
     def acquire_container(self, name, engine='minifi-cpp', command=None):
         return self.cluster.acquire_container(name, engine, command)
 
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
index a53b86aab..ce1b5fd5a 100644
--- a/docker/test/integration/environment.py
+++ b/docker/test/integration/environment.py
@@ -17,11 +17,15 @@
 import logging
 import datetime
 import sys
+import uuid
+import os
 sys.path.append('../minifi')
 
 from MiNiFi_integration_test_driver import MiNiFi_integration_test  # noqa: E402
 from minifi import *  # noqa
 from minifi.core.ImageStore import ImageStore # noqa
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
+from minifi.core.KubernetesProxy import KubernetesProxy # noqa
 
 
 def before_scenario(context, scenario):
@@ -30,7 +34,7 @@ def before_scenario(context, scenario):
         return
 
     logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
-    context.test = MiNiFi_integration_test(context.image_store)
+    context.test = MiNiFi_integration_test(context)
 
 
 def after_scenario(context, scenario):
@@ -40,8 +44,28 @@ def after_scenario(context, scenario):
 
     logging.info("Integration test teardown at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
     context.test.cleanup()
+    context.directory_bindings.cleanup_io()
+    if context.kubernetes_proxy:
+        context.kubernetes_proxy.delete_pods()
 
 
 def before_all(context):
+    context.test_id = str(uuid.uuid4())
     context.config.setup_logging()
     context.image_store = ImageStore()
+    context.directory_bindings = DockerTestDirectoryBindings(context.test_id)
+    context.directory_bindings.create_new_data_directories()
+    context.kubernetes_proxy = None
+
+
+def before_tag(context, tag):
+    if tag == "requires.kubernetes.cluster":
+        context.kubernetes_proxy = KubernetesProxy(context.directory_bindings.get_data_directories(context.test_id)["kubernetes_temp_dir"], os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc'))
+        context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings(context.test_id))
+        context.kubernetes_proxy.start_cluster()
+
+
+def after_tag(context, tag):
+    if tag == "requires.kubernetes.cluster" and context.kubernetes_proxy:
+        context.kubernetes_proxy.cleanup()
+        context.kubernetes_proxy = None
diff --git a/docker/test/integration/features/kubernetes.feature b/docker/test/integration/features/kubernetes.feature
index 5d14b1982..5a24755e3 100644
--- a/docker/test/integration/features/kubernetes.feature
+++ b/docker/test/integration/features/kubernetes.feature
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+@requires.kubernetes.cluster
 Feature: TailFile can collect logs from Kubernetes pods
 
   Background:
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index c29e0baa5..7158b7a45 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -36,8 +36,8 @@ class DockerTestCluster(SingleNodeDockerCluster):
         ("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
          "BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;")
 
-    def __init__(self, image_store):
-        super(DockerTestCluster, self).__init__(image_store)
+    def __init__(self, context):
+        super(DockerTestCluster, self).__init__(context)
         self.segfault = False
 
     @staticmethod
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 91a13bb2a..3781138a9 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -20,29 +20,40 @@ import shutil
 
 
 class DockerTestDirectoryBindings:
-    def __init__(self):
+    def __init__(self, test_id):
         self.data_directories = {}
+        self.test_id = test_id
 
     def __del__(self):
         self.delete_data_directories()
 
-    def create_new_data_directories(self, test_id):
-        self.data_directories[test_id] = {
-            "input_dir": "/tmp/.nifi-test-input." + test_id,
-            "output_dir": "/tmp/.nifi-test-output." + test_id,
-            "resources_dir": "/tmp/.nifi-test-resources." + test_id,
-            "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + test_id,
-            "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + test_id
+    def cleanup_io(self):
+        for folder in [self.data_directories[self.test_id]["input_dir"], self.data_directories[self.test_id]["output_dir"]]:
+            for filename in os.listdir(folder):
+                file_path = os.path.join(folder, filename)
+                if os.path.isfile(file_path) or os.path.islink(file_path):
+                    os.unlink(file_path)
+                elif os.path.isdir(file_path):
+                    shutil.rmtree(file_path)
+
+    def create_new_data_directories(self):
+        self.data_directories[self.test_id] = {
+            "input_dir": "/tmp/.nifi-test-input." + self.test_id,
+            "output_dir": "/tmp/.nifi-test-output." + self.test_id,
+            "resources_dir": "/tmp/.nifi-test-resources." + self.test_id,
+            "minifi_config_dir": "/tmp/.nifi-test-minifi-config-dir." + self.test_id,
+            "nifi_config_dir": "/tmp/.nifi-test-nifi-config-dir." + self.test_id,
+            "kubernetes_temp_dir": "/tmp/.nifi-test-kubernetes-temp-dir." + self.test_id
         }
 
-        [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
+        [self.create_directory(directory) for directory in self.data_directories[self.test_id].values()]
 
         # Add resources
         test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
-        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
-        shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
-        shutil.copytree(test_dir + "/resources/opcua", self.data_directories[test_id]["resources_dir"] + "/opcua")
-        shutil.copytree(test_dir + "/resources/lua", self.data_directories[test_id]["resources_dir"] + "/lua")
+        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[self.test_id]["resources_dir"] + "/certs")
+        shutil.copytree(test_dir + "/resources/python", self.data_directories[self.test_id]["resources_dir"] + "/python")
+        shutil.copytree(test_dir + "/resources/opcua", self.data_directories[self.test_id]["resources_dir"] + "/opcua")
+        shutil.copytree(test_dir + "/resources/lua", self.data_directories[self.test_id]["resources_dir"] + "/lua")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/KindProxy.py b/docker/test/integration/minifi/core/KubernetesProxy.py
similarity index 81%
rename from docker/test/integration/minifi/core/KindProxy.py
rename to docker/test/integration/minifi/core/KubernetesProxy.py
index b8da30bd5..0af95fc81 100644
--- a/docker/test/integration/minifi/core/KindProxy.py
+++ b/docker/test/integration/minifi/core/KubernetesProxy.py
@@ -23,7 +23,7 @@ import time
 from textwrap import dedent
 
 
-class KindProxy:
+class KubernetesProxy:
     def __init__(self, temp_directory, resources_directory):
         self.temp_directory = temp_directory
         self.resources_directory = resources_directory
@@ -33,6 +33,12 @@ class KindProxy:
         self.__download_kind()
         self.docker_client = docker.from_env()
 
+    def __del__(self):
+        self.cleanup()
+
+    def cleanup(self):
+        subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
+
     def __download_kind(self):
         if subprocess.run(['curl', '-Lo', self.kind_binary_path, 'https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64']).returncode != 0:
             raise Exception("Could not download kind")
@@ -78,9 +84,12 @@ class KindProxy:
         self.__create_objects_of_type(self.resources_directory, 'clusterrole')
         self.__create_objects_of_type(self.resources_directory, 'clusterrolebinding')
 
+    def delete_pods(self):
+        self.__delete_objects_of_type('pod')
+
     def __wait_for_default_service_account(self, namespace):
         for _ in range(120):
-            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', '-n', namespace, 'get', 'serviceaccount', 'default'])
             if code == 0:
                 return
             time.sleep(1)
@@ -93,7 +102,7 @@ class KindProxy:
             file_name_in_container = os.path.join('/var/tmp', file_name)
             self.__copy_file_to_container(full_file_name, 'kind-control-plane', file_name_in_container)
 
-            (code, output) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'apply', '-f', file_name_in_container])
             if code != 0:
                 raise Exception("Could not create kubernetes object from file '%s'" % full_file_name)
 
@@ -101,6 +110,15 @@ class KindProxy:
             found_objects.append(object_name)
         return found_objects
 
+    def __delete_objects_of_type(self, type):
+        for full_file_name in glob.iglob(os.path.join(self.resources_directory, f'*.{type}.yml')):
+            file_name = os.path.basename(full_file_name)
+            file_name_in_container = os.path.join('/var/tmp', file_name)
+
+            (code, _) = self.docker_client.containers.get('kind-control-plane').exec_run(['kubectl', 'delete', '-f', file_name_in_container, '--grace-period=0', '--force'])
+            if code != 0:
+                raise Exception("Could not delete kubernetes object from file '%s'" % file_name_in_container)
+
     def __copy_file_to_container(self, host_file, container_name, container_file):
         if subprocess.run(['docker', 'cp', host_file, container_name + ':' + container_file]).returncode != 0:
             raise Exception("Could not copy file '%s' into container '%s' as '%s'" % (host_file, container_name, container_file))
@@ -111,8 +129,3 @@ class KindProxy:
             return output
         else:
             return None
-
-    def cleanup(self):
-        # cleanup gets called multiple times, also after the temp directories had been removed
-        if os.path.exists(self.kind_binary_path):
-            subprocess.run([self.kind_binary_path, 'delete', 'cluster'])
diff --git a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
index 81ba5b02e..e9c821367 100644
--- a/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
+++ b/docker/test/integration/minifi/core/MinifiAsPodInKubernetesCluster.py
@@ -19,7 +19,6 @@ import logging
 import os
 import shutil
 
-from .KindProxy import KindProxy
 from .LogSource import LogSource
 from .MinifiContainer import MinifiContainer
 
@@ -28,11 +27,9 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
     MINIFI_IMAGE_NAME = 'apacheminificpp'
     MINIFI_IMAGE_TAG = 'docker_test'
 
-    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+    def __init__(self, kubernetes_proxy, config_dir, name, vols, network, image_store, command=None):
         super().__init__(config_dir, name, vols, network, image_store, command)
-
-        resources_directory = os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')
-        self.kind = KindProxy(self.config_dir, resources_directory)
+        self.kubernetes_proxy = kubernetes_proxy
 
         test_dir = os.environ['TEST_DIRECTORY']
         shutil.copy(os.path.join(test_dir, os.pardir, os.pardir, os.pardir, 'conf', 'minifi.properties'), self.config_dir)
@@ -50,10 +47,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
 
         self._create_config()
 
-        self.kind.create_config(self.vols)
-        self.kind.start_cluster()
-        self.kind.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
-        self.kind.create_objects()
+        self.kubernetes_proxy.load_docker_image(MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_NAME, MinifiAsPodInKubernetesCluster.MINIFI_IMAGE_TAG)
+        self.kubernetes_proxy.create_objects()
 
         logging.info('Finished setting up container: %s', self.name)
 
@@ -61,8 +56,8 @@ class MinifiAsPodInKubernetesCluster(MinifiContainer):
         return LogSource.FROM_GET_APP_LOG_METHOD
 
     def get_app_log(self):
-        return 'OK', self.kind.get_logs('daemon', 'log-collector')
+        return 'OK', self.kubernetes_proxy.get_logs('daemon', 'log-collector')
 
     def cleanup(self):
-        logging.info('Cleaning up container: %s', self.name)
-        self.kind.cleanup()
+        # cleanup is done through the kubernetes cluster in the environment.py
+        pass
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index 735295fd8..f4d570298 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -68,6 +68,3 @@ class OutputEventHandler(FileSystemEventHandler):
 
             if file_count_modified:
                 self.done_event.set()
-
-    def on_deleted(self, event):
-        logging.info("Output file deleted: " + event.src_path)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 50dc9f3eb..b622685b0 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -44,12 +44,13 @@ class SingleNodeDockerCluster(Cluster):
     testing or use-cases which do not span multiple compute nodes.
     """
 
-    def __init__(self, image_store):
+    def __init__(self, context):
         self.vols = {}
         self.network = self.create_docker_network()
         self.containers = {}
-        self.image_store = image_store
+        self.image_store = context.image_store
         self.data_directories = {}
+        self.kubernetes_proxy = context.kubernetes_proxy
 
         # Get docker client
         self.client = docker.from_env()
@@ -91,7 +92,7 @@ class SingleNodeDockerCluster(Cluster):
         elif engine == 'minifi-cpp':
             return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kubernetes':
-            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
+            return self.containers.setdefault(name, MinifiAsPodInKubernetesCluster(self.kubernetes_proxy, self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'transient-minifi':
             return self.containers.setdefault(name, TransientMinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp-with-provenance-repo':
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 8a64acadf..f588c8cc7 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -44,7 +44,7 @@ import os
 # Background
 @given("the content of \"{directory}\" is monitored")
 def step_impl(context, directory):
-    context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
+    context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(context.test_id, directory)))
 
 
 def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
@@ -394,12 +394,14 @@ def step_impl(context):
 def step_impl(context):
     context.test.acquire_container("azure-storage-server", "azure-storage-server")
 
+
 # syslog client
 @given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi')
 def step_impl(context, protocol):
     client_name = "syslog-" + protocol.lower() + "-client"
     context.test.acquire_container(client_name, client_name)
 
+
 # google cloud storage setup
 @given("a Google Cloud storage server is set up with some test data")
 @given("a Google Cloud storage server is set up")


[nifi-minifi-cpp] 02/04: MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9cc12971c94d6d0958ecdd2da8a63e004aa01e8c
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 17:15:45 2022 +0200

    MINIFICPP-1787 Add option to fix invalid attributes in HTTP header of InvokeHTTP
    
    Closes #1321
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |   1 +
 extensions/http-curl/client/HTTPClient.cpp         |  33 ++++
 extensions/http-curl/client/HTTPClient.h           |   4 +
 extensions/http-curl/processors/InvokeHTTP.cpp     | 211 +++++++++++----------
 extensions/http-curl/processors/InvokeHTTP.h       |  76 ++------
 .../http-curl/tests/unit/HTTPClientTests.cpp       |  18 ++
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |  92 ++++++++-
 7 files changed, 269 insertions(+), 166 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d97f8752d..88a074211 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -924,6 +924,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Follow Redirects|true||Follow HTTP redirects issued by remote server.|
 |HTTP Method|GET||HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.|
 |Include Date Header|true||Include an RFC-2616 Date header in the request.|
+|**Invalid HTTP Header Field Handling Strategy**|transform|transform<br/>fail<br/>drop|Indicates what should happen when an attribute's name is not a valid HTTP header field name.<br/>Options:<br/>transform - invalid characters are replaced<br/>fail - flow file is transferred to failure<br/>drop - drops invalid attributes from HTTP message|
 |invokehttp-proxy-password|||Password to set when authenticating against proxy|
 |invokehttp-proxy-username|||Username to set when authenticating against proxy|
 |Penalize on "No Retry"|false||Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship.|
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 16298aa24..eeec4194f 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -455,6 +455,39 @@ void HTTPClient::setFollowRedirects(bool follow) {
   curl_easy_setopt(http_session_, CURLOPT_FOLLOWLOCATION, follow);
 }
 
+bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
+  if (field_name.size() == 0) {
+    return false;
+  }
+
+  // RFC822 3.1.2: The  field-name must be composed of printable ASCII characters
+  // (i.e., characters that  have  values  between  33.  and  126., decimal, except colon).
+  for (auto ch : field_name) {
+    if (ch < 33 || ch > 126 || ch == ':') {
+      return false;
+    }
+  }
+  return true;
+}
+
+std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name) {
+  if (field_name.size() == 0) {
+    return "X-MiNiFi-Empty-Attribute-Name";
+  }
+
+  std::string result;
+  // RFC822 3.1.2: The  field-name must be composed of printable ASCII characters
+  // (i.e., characters that  have  values  between  33.  and  126., decimal, except colon).
+  for (auto ch : field_name) {
+    if (ch < 33 || ch > 126 || ch == ':') {
+      result += '-';
+    } else {
+      result += ch;
+    }
+  }
+  return result;
+}
+
 REGISTER_INTERNAL_RESOURCE(HTTPClient);
 
 }  // namespace utils
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 05a24e144..7159895c2 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -48,6 +48,7 @@
 #else
 #include <regex.h>
 #endif
+#include <string_view>
 
 #include "utils/ByteArrayCallback.h"
 #include "controllers/SSLContextService.h"
@@ -238,6 +239,9 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
     }
   }
 
+  static bool isValidHttpHeaderField(std::string_view field_name);
+  static std::string replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name);
+
  private:
   static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
 
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 3a4516883..aea10f730 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -17,11 +17,7 @@
  */
 
 #include "InvokeHTTP.h"
-#ifdef WIN32
-#include <regex>
-#else
-#include <regex.h>
-#endif
+
 #include <memory>
 #include <cinttypes>
 #include <cstdint>
@@ -39,14 +35,10 @@
 #include "ResourceClaim.h"
 #include "utils/gsl.h"
 #include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
-const char *InvokeHTTP::ProcessorName = "InvokeHTTP";
 std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
 
 core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
@@ -115,6 +107,16 @@ core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will
 core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
 
 core::Property InvokeHTTP::DisablePeerVerification("Disable Peer Verification", "Disables peer verification for the SSL session", "false");
+
+core::Property InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy(
+    core::PropertyBuilder::createProperty("Invalid HTTP Header Field Handling Strategy")
+      ->withDescription("Indicates what should happen when an attribute's name is not a valid HTTP header field name. "
+        "Options: transform - invalid characters are replaced, fail - flow file is transferred to failure, drop - drops invalid attributes from HTTP message")
+      ->isRequired(true)
+      ->withDefaultValue<std::string>(toString(InvalidHTTPHeaderFieldHandlingOption::TRANSFORM))
+      ->withAllowableValues<std::string>(InvalidHTTPHeaderFieldHandlingOption::values())
+      ->build());
+
 const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
 const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
 const char* InvokeHTTP::RESPONSE_BODY = "invokehttp.response.body";
@@ -142,32 +144,29 @@ core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will
 
 void InvokeHTTP::initialize() {
   logger_->log_trace("Initializing InvokeHTTP");
-
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(Method);
-  properties.insert(URL);
-  properties.insert(ConnectTimeout);
-  properties.insert(ReadTimeout);
-  properties.insert(DateHeader);
-  properties.insert(AttributesToSend);
-  properties.insert(SSLContext);
-  properties.insert(ProxyHost);
-  properties.insert(ProxyPort);
-  properties.insert(ProxyUsername);
-  properties.insert(ProxyPassword);
-  properties.insert(UseChunkedEncoding);
-  properties.insert(ContentType);
-  properties.insert(SendBody);
-  properties.insert(SendMessageBody);
-  properties.insert(DisablePeerVerification);
-  properties.insert(AlwaysOutputResponse);
-  properties.insert(FollowRedirects);
-  properties.insert(PropPutOutputAttributes);
-  properties.insert(PenalizeOnNoRetry);
-
-  setSupportedProperties(properties);
-  // Set the supported relationships
+  setSupportedProperties({
+    Method,
+    URL,
+    ConnectTimeout,
+    ReadTimeout,
+    DateHeader,
+    AttributesToSend,
+    SSLContext,
+    ProxyHost,
+    ProxyPort,
+    ProxyUsername,
+    ProxyPassword,
+    UseChunkedEncoding,
+    ContentType,
+    SendBody,
+    SendMessageBody,
+    DisablePeerVerification,
+    AlwaysOutputResponse,
+    FollowRedirects,
+    PropPutOutputAttributes,
+    PenalizeOnNoRetry,
+    InvalidHTTPHeaderFieldHandlingStrategy
+  });
   setSupportedRelationships({Success, RelResponse, RelFailure, RelRetry, RelNoRetry});
 }
 
@@ -182,7 +181,6 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-
   if (auto connect_timeout = context->getProperty<core::TimePeriodValue>(ConnectTimeout)) {
     connect_timeout_ms_ =  connect_timeout->getMilliseconds();
   } else {
@@ -190,9 +188,9 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
-  std::string contentTypeStr;
-  if (context->getProperty(ContentType.getName(), contentTypeStr)) {
-    content_type_ = contentTypeStr;
+  std::string content_type_str;
+  if (context->getProperty(ContentType.getName(), content_type_str)) {
+    content_type_ = content_type_str;
   }
 
   if (auto read_timeout = context->getProperty<core::TimePeriodValue>(ReadTimeout)) {
@@ -201,12 +199,12 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName(), ReadTimeout.getValue());
   }
 
-  std::string dateHeaderStr;
-  if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
+  std::string date_header_str;
+  if (!context->getProperty(DateHeader.getName(), date_header_str)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", DateHeader.getName(), DateHeader.getValue());
   }
 
-  date_header_include_ = utils::StringUtils::toBool(dateHeaderStr).value_or(DateHeader.getValue());
+  date_header_include_ = utils::StringUtils::toBool(date_header_str).value_or(DateHeader.getValue());
 
   if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName(), PropPutOutputAttributes.getValue());
@@ -238,15 +236,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
     }
   }
 
-  std::string useChunkedEncoding = "false";
-  if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) {
+  std::string use_chunked_encoding = "false";
+  if (!context->getProperty(UseChunkedEncoding.getName(), use_chunked_encoding)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName(), UseChunkedEncoding.getValue());
   }
 
-  use_chunked_encoding_ = utils::StringUtils::toBool(useChunkedEncoding).value_or(false);
+  use_chunked_encoding_ = utils::StringUtils::toBool(use_chunked_encoding).value_or(false);
 
-  std::string disablePeerVerification;
-  disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disablePeerVerification) && utils::StringUtils::toBool(disablePeerVerification).value_or(false));
+  std::string disable_peer_verification;
+  disable_peer_verification_ = (context->getProperty(DisablePeerVerification.getName(), disable_peer_verification) && utils::StringUtils::toBool(disable_peer_verification).value_or(false));
 
   proxy_ = {};
   context->getProperty(ProxyHost.getName(), proxy_.host);
@@ -258,27 +256,35 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
   context->getProperty(ProxyPassword.getName(), proxy_.password);
   context->getProperty(FollowRedirects.getName(), follow_redirects_);
   context->getProperty(SendMessageBody.getName(), send_body_);
-}
 
-InvokeHTTP::~InvokeHTTP() = default;
+  invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<InvalidHTTPHeaderFieldHandlingOption>(*context, InvalidHTTPHeaderFieldHandlingStrategy);
+}
 
-std::string InvokeHTTP::generateId() {
-  return utils::IdGenerator::getIdGenerator()->generate().to_string();
+bool InvokeHTTP::shouldEmitFlowFile() const {
+  return ("POST" == method_ || "PUT" == method_ || "PATCH" == method_);
 }
 
-bool InvokeHTTP::emitFlowFile(const std::string &method) {
-  return ("POST" == method || "PUT" == method || "PATCH" == method);
+std::optional<std::map<std::string, std::string>> InvokeHTTP::validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const {
+  std::map<std::string, std::string> result;
+  for (const auto& [attribute_name, attribute_value] : attributes) {
+    if (utils::HTTPClient::isValidHttpHeaderField(attribute_name)) {
+      result.emplace(attribute_name, attribute_value);
+    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::TRANSFORM) {
+      result.emplace(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(attribute_name), attribute_value);
+    } else if (invalid_http_header_field_handling_strategy_ == InvalidHTTPHeaderFieldHandlingOption::FAIL) {
+      return std::nullopt;
+    }
+  }
+  return result;
 }
 
 void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  auto flowFile = session->get();
+  auto flow_file = session->get();
 
-  std::string url = url_;
-
-  if (flowFile == nullptr) {
-    if (!emitFlowFile(method_)) {
+  if (flow_file == nullptr) {
+    if (!shouldEmitFlowFile()) {
       logger_->log_debug("InvokeHTTP -- create flow file with  %s", method_);
-      flowFile = session->create();
+      flow_file = session->create();
     } else {
       logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", method_);
       yield();
@@ -291,11 +297,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   logger_->log_debug("onTrigger InvokeHTTP with %s to %s", method_, url_);
 
   // create a transaction id
-  std::string tx_id = generateId();
+  std::string tx_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
 
-  // Note: callback must be declared before callbackObj so that they are destructed in the correct order
-  std::unique_ptr<utils::ByteInputCallback> callback = nullptr;
-  std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
+  // Note: callback must be declared before callback_obj so that they are destructed in the correct order
+  std::unique_ptr<utils::ByteInputCallback> callback;
+  std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
 
   // Client declared after the callbacks to make sure the callbacks are still available when the client is destructed
   utils::HTTPClient client(url_, ssl_context_service_);
@@ -320,25 +326,25 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
 
   client.setHTTPProxy(proxy_);
 
-  if (emitFlowFile(method_)) {
+  if (shouldEmitFlowFile()) {
     logger_->log_trace("InvokeHTTP -- reading flowfile");
-    std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
+    std::shared_ptr<ResourceClaim> claim = flow_file->getResourceClaim();
     if (claim) {
       callback = std::make_unique<utils::ByteInputCallback>();
       if (send_body_) {
-        session->read(flowFile, std::ref(*callback));
+        session->read(flow_file, std::ref(*callback));
       }
-      callbackObj = std::make_unique<utils::HTTPUploadCallback>();
-      callbackObj->ptr = callback.get();
-      callbackObj->pos = 0;
+      callback_obj = std::make_unique<utils::HTTPUploadCallback>();
+      callback_obj->ptr = callback.get();
+      callback_obj->pos = 0;
       logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
       if (!send_body_) {
         client.appendHeader("Content-Length", "0");
       } else if (!use_chunked_encoding_) {
-        client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
+        client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
       }
-      client.setUploadCallback(callbackObj.get());
-      client.setSeekFunction(callbackObj.get());
+      client.setUploadCallback(callback_obj.get());
+      client.setSeekFunction(callback_obj.get());
     } else {
       logger_->log_error("InvokeHTTP -- no resource claim");
     }
@@ -348,14 +354,19 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   }
 
   // append all headers
-  client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes());
+  auto attributes_in_headers = validateAttributesAgainstHTTPHeaderRules(flow_file->getAttributes());
+  if (!attributes_in_headers) {
+    session->transfer(flow_file, RelFailure);
+    return;
+  }
+  client.build_header_list(attribute_to_send_regex_, *attributes_in_headers);
 
   logger_->log_trace("InvokeHTTP -- curl performed");
   if (client.submit()) {
     logger_->log_trace("InvokeHTTP -- curl successful");
 
-    bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
-    if (putToAttribute) {
+    bool put_to_attribute = !IsNullOrEmpty(put_attribute_name_);
+    if (put_to_attribute) {
       logger_->log_debug("Adding http response body to flow file attribute %s", put_attribute_name_);
     }
 
@@ -364,21 +375,21 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
 
     int64_t http_code = client.getResponseCode();
     const char *content_type = client.getContentType();
-    flowFile->addAttribute(STATUS_CODE, std::to_string(http_code));
+    flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
     if (!response_headers.empty())
-      flowFile->addAttribute(STATUS_MESSAGE, response_headers.at(0));
-    flowFile->addAttribute(REQUEST_URL, url_);
-    flowFile->addAttribute(TRANSACTION_ID, tx_id);
+      flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0));
+    flow_file->addAttribute(REQUEST_URL, url_);
+    flow_file->addAttribute(TRANSACTION_ID, tx_id);
 
-    bool isSuccess = (static_cast<int32_t>(http_code / 100) == 2);
-    bool output_body_to_content = isSuccess && !putToAttribute;
+    bool is_success = (static_cast<int32_t>(http_code / 100) == 2);
+    bool output_body_to_content = is_success && !put_to_attribute;
 
-    logger_->log_debug("isSuccess: %d, response code %" PRId64, isSuccess, http_code);
+    logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
     std::shared_ptr<core::FlowFile> response_flow = nullptr;
 
     if (output_body_to_content) {
-      if (flowFile != nullptr) {
-        response_flow = session->create(flowFile);
+      if (flow_file != nullptr) {
+        response_flow = session->create(flow_file);
       } else {
         response_flow = session->create();
       }
@@ -389,47 +400,47 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
       response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
       if (!response_headers.empty())
         response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0));
-      response_flow->addAttribute(REQUEST_URL, url);
+      response_flow->addAttribute(REQUEST_URL, url_);
       response_flow->addAttribute(TRANSACTION_ID, tx_id);
       io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
       // need an import from the data stream.
       session->importFrom(stream, response_flow);
     }
-    route(flowFile, response_flow, session, context, isSuccess, http_code);
+    route(flow_file, response_flow, session, context, is_success, http_code);
   } else {
-    session->penalize(flowFile);
-    session->transfer(flowFile, RelFailure);
+    session->penalize(flow_file);
+    session->transfer(flow_file, RelFailure);
   }
 }
 
 void InvokeHTTP::route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
-                       const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode) {
+                       const std::shared_ptr<core::ProcessContext> &context, bool is_success, int64_t status_code) {
   // check if we should yield the processor
-  if (!isSuccess && request == nullptr) {
+  if (!is_success && request == nullptr) {
     context->yield();
   }
 
   // If the property to output the response flowfile regardless of status code is set then transfer it
-  bool responseSent = false;
+  bool response_sent = false;
   if (always_output_response_ && response != nullptr) {
     logger_->log_debug("Outputting success and response");
     session->transfer(response, RelResponse);
-    responseSent = true;
+    response_sent = true;
   }
 
   // transfer to the correct relationship
   // 2xx -> SUCCESS
-  if (isSuccess) {
+  if (is_success) {
     // we have two flowfiles to transfer
     if (request != nullptr) {
       session->transfer(request, Success);
     }
-    if (response != nullptr && !responseSent) {
+    if (response != nullptr && !response_sent) {
       logger_->log_debug("Outputting success and response");
       session->transfer(response, RelResponse);
     }
     // 5xx -> RETRY
-  } else if (statusCode / 100 == 5) {
+  } else if (status_code / 100 == 5) {
     if (request != nullptr) {
       session->penalize(request);
       session->transfer(request, RelRetry);
@@ -449,8 +460,4 @@ REGISTER_RESOURCE(InvokeHTTP, "An HTTP client processor which can interact with
     "The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the "
     "FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h
index 08c89c604..bff7895b9 100644
--- a/extensions/http-curl/processors/InvokeHTTP.h
+++ b/extensions/http-curl/processors/InvokeHTTP.h
@@ -21,6 +21,7 @@
 #include <curl/curl.h>
 #include <memory>
 #include <string>
+#include <map>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -31,30 +32,24 @@
 #include "utils/Id.h"
 #include "../client/HTTPClient.h"
 #include "utils/Export.h"
+#include "utils/Enum.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
-// InvokeHTTP Class
 class InvokeHTTP : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
+  SMART_ENUM(InvalidHTTPHeaderFieldHandlingOption,
+    (FAIL, "fail"),
+    (TRANSFORM, "transform"),
+    (DROP, "drop")
+  )
+
   explicit InvokeHTTP(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid) {
     setTriggerWhenEmpty(true);
   }
-  // Destructor
-  virtual ~InvokeHTTP();
-  // Processor Name
-  EXTENSIONAPI static const char *ProcessorName;
   EXTENSIONAPI static std::string DefaultContentType;
-  // Supported Properties
+
   EXTENSIONAPI static core::Property Method;
   EXTENSIONAPI static core::Property URL;
   EXTENSIONAPI static core::Property ConnectTimeout;
@@ -73,10 +68,9 @@ class InvokeHTTP : public core::Processor {
   EXTENSIONAPI static core::Property UseChunkedEncoding;
   EXTENSIONAPI static core::Property DisablePeerVerification;
   EXTENSIONAPI static core::Property PropPutOutputAttributes;
-
   EXTENSIONAPI static core::Property AlwaysOutputResponse;
-
   EXTENSIONAPI static core::Property PenalizeOnNoRetry;
+  EXTENSIONAPI static core::Property InvalidHTTPHeaderFieldHandlingStrategy;
 
   EXTENSIONAPI static const char* STATUS_CODE;
   EXTENSIONAPI static const char* STATUS_MESSAGE;
@@ -86,7 +80,7 @@ class InvokeHTTP : public core::Processor {
   EXTENSIONAPI static const char* REMOTE_DN;
   EXTENSIONAPI static const char* EXCEPTION_CLASS;
   EXTENSIONAPI static const char* EXCEPTION_MESSAGE;
-  // Supported Relationships
+
   EXTENSIONAPI static core::Relationship Success;
   EXTENSIONAPI static core::Relationship RelResponse;
   EXTENSIONAPI static core::Relationship RelRetry;
@@ -96,20 +90,8 @@ class InvokeHTTP : public core::Processor {
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  /**
-   * Provides a reference to the URL.
-   */
-  const std::string &getUrl() {
-    return url_;
-  }
-
- protected:
-  /**
-   * Generate a transaction ID
-   * @return transaction ID string.
-   */
-  std::string generateId();
 
+ private:
   /**
    * Routes the flowfile to the proper destination
    * @param request request flow file record
@@ -121,49 +103,29 @@ class InvokeHTTP : public core::Processor {
    */
   void route(const std::shared_ptr<core::FlowFile> &request, const std::shared_ptr<core::FlowFile> &response, const std::shared_ptr<core::ProcessSession> &session,
              const std::shared_ptr<core::ProcessContext> &context, bool isSuccess, int64_t statusCode);
-  /**
-   * Determine if we should emit a new flowfile based on our activity
-   * @param method method type
-   * @return result of the evaluation.
-   */
-  bool emitFlowFile(const std::string &method);
-
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_{nullptr};
+  bool shouldEmitFlowFile() const;
+  std::optional<std::map<std::string, std::string>> validateAttributesAgainstHTTPHeaderRules(const std::map<std::string, std::string>& attributes) const;
 
-  // http method
+  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
   std::string method_;
-  // url
   std::string url_;
-  // include date in the header
   bool date_header_include_{true};
-  // attribute to send regex
   std::string attribute_to_send_regex_;
-  // connection timeout
   std::chrono::milliseconds connect_timeout_ms_{20000};
-  // read timeout.
   std::chrono::milliseconds read_timeout_ms_{20000};
   // attribute in which response body will be added
   std::string put_attribute_name_;
-  // determine if we always output a response.
   bool always_output_response_{false};
-  // content type.
   std::string content_type_;
-  // use chunked encoding.
   bool use_chunked_encoding_{false};
-  // penalize on no retry
   bool penalize_no_retry_{false};
-  // disable peer verification ( makes susceptible for MITM attacks )
+  // disabling peer verification makes susceptible for MITM attacks
   bool disable_peer_verification_{false};
   utils::HTTPProxy proxy_;
   bool follow_redirects_{true};
   bool send_body_{true};
-
- private:
+  InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger()};
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/http-curl/tests/unit/HTTPClientTests.cpp b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
index ac6d78ffe..1101b4ef4 100644
--- a/extensions/http-curl/tests/unit/HTTPClientTests.cpp
+++ b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
@@ -95,3 +95,21 @@ TEST_CASE("HTTPClient escape test") {
   CHECK(client.escape("Hello Günter") == "Hello%20G%C3%BCnter");
   CHECK(client.escape("шеллы") == "%D1%88%D0%B5%D0%BB%D0%BB%D1%8B");
 }
+
+TEST_CASE("HTTPClient isValidHttpHeaderField test") {
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(""));
+  CHECK(utils::HTTPClient::isValidHttpHeaderField("valid"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(" "));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(11) + "character"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField(std::string("invalid") + static_cast<char>(128) + "character"));
+  CHECK_FALSE(utils::HTTPClient::isValidHttpHeaderField("contains:invalid"));
+}
+
+TEST_CASE("HTTPClient replaceInvalidCharactersInHttpHeaderFieldName test") {
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("") == "X-MiNiFi-Empty-Attribute-Name");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("valid") == "valid");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(" ") == "-");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(11) + "character") == "invalid-character");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string("invalid") + static_cast<char>(128) + "character") == "invalid-character");
+  CHECK(utils::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName("contains:invalid") == "contains-invalid");
+}
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index b8213c53b..032e5cee7 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -34,27 +34,41 @@
 #include "core/ProcessorNode.h"
 #include "processors/LogAttribute.h"
 #include "utils/gsl.h"
-#include "processors/GenerateFlowFile.h"
+#include "SingleProcessorTestController.h"
+
+namespace org::apache::nifi::minifi::test {
 
-namespace {
 class TestHTTPServer {
  public:
   TestHTTPServer();
   static constexpr const char* PROCESSOR_NAME = "my_http_server";
   static constexpr const char* URL = "http://localhost:8681/testytesttest";
 
+  void trigger() {
+    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+    test_plan_->reset();
+    test_controller_.runSession(test_plan_);
+  }
+
  private:
   TestController test_controller_;
+  std::shared_ptr<core::Processor> listen_http_;
+  std::shared_ptr<core::Processor> log_attribute_;
   std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
 };
 
 TestHTTPServer::TestHTTPServer() {
-  std::shared_ptr<core::Processor> listen_http = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
-  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest");
-  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+
+  listen_http_ = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
+  log_attribute_ = test_plan_->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true);
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "testytesttest");
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+  test_plan_->setProperty(listen_http_, org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex.getName(), ".*");
   test_controller_.runSession(test_plan_);
 }
-}  // namespace
 
 TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   TestController testController;
@@ -251,7 +265,7 @@ TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
   std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
 
   plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
-  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), "http://localhost:8681/invalid");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
 
   constexpr const char* PENALIZE_LOG_PATTERN = "Penalizing [0-9a-f-]+ for [0-9]+ms at invokehttp";
@@ -289,3 +303,67 @@ TEST_CASE("HTTPTestsPutResponseBodyinAttribute", "[httptest1]") {
 
   REQUIRE(LogTestController::getInstance().contains("Adding http response body to flow file attribute http.type"));
 }
+
+TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in HTTP headers", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  LogTestController::getInstance().setDebug<InvokeHTTP>();
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::RelFailure);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+}
+
+TEST_CASE("InvokeHTTP replaces invalid characters of attributes", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE(LogTestController::getInstance().contains("key:invalid-header value:value"));
+  REQUIRE(LogTestController::getInstance().contains("key:X-MiNiFi-Empty-Attribute-Name value:value2"));
+}
+
+TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers", "[httptest1]") {
+  using minifi::processors::InvokeHTTP;
+  TestHTTPServer http_server;
+
+  auto invokehttp = std::make_shared<InvokeHTTP>("InvokeHTTP");
+  test::SingleProcessorTestController test_controller{invokehttp};
+  LogTestController::getInstance().setTrace<InvokeHTTP>();
+
+  invokehttp->setProperty(InvokeHTTP::Method, "GET");
+  invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
+  invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
+  const auto result = test_controller.trigger();
+  auto file_contents = result.at(InvokeHTTP::Success);
+  REQUIRE(file_contents.size() == 1);
+  REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
+  http_server.trigger();
+  REQUIRE(LogTestController::getInstance().contains("key:legit-header value:value1"));
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
+}
+
+}  // namespace org::apache::nifi::minifi::test


[nifi-minifi-cpp] 01/04: MINIFICPP-1687 Signal error on UUID collision

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6e573f7373f5b36b06b6317978a8b691702257bf
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed May 11 15:28:12 2022 +0200

    MINIFICPP-1687 Signal error on UUID collision
    
    Closes #1316
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../tests/unit/YamlConfigurationTests.cpp          | 64 ++++++++++++++++++++
 libminifi/include/core/yaml/CheckRequiredField.h   |  4 +-
 libminifi/include/core/yaml/YamlConfiguration.h    |  4 ++
 libminifi/src/core/yaml/CheckRequiredField.cpp     | 12 ++--
 libminifi/src/core/yaml/YamlConfiguration.cpp      | 69 +++++++++++++---------
 libminifi/test/resources/TestHTTPSiteToSite.yml    |  8 +--
 6 files changed, 121 insertions(+), 40 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 62b88ce7a..0df17ee62 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -27,6 +27,7 @@
 #include "TestBase.h"
 #include "Catch.h"
 #include "utils/TestUtils.h"
+#include "utils/StringUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -867,3 +868,66 @@ Remote Process Groups: []
     REQUIRE(it.second->getSource());
   }
 }
+
+TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
+  TestController test_controller;
+  std::shared_ptr<core::Repository> test_prov_repo = core::createRepository("provenancerepository", true);
+  std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository", true);
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  core::YamlConfiguration yaml_config(test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration);
+
+  for (char i = '1'; i <= '8'; ++i) {
+    DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
+      std::string config_yaml =
+        R"(
+          Flow Controller:
+            name: root
+            comment: ''
+          Processors:
+          - id: 00000000-0000-0000-0000-000000000001
+            name: GenerateFlowFile1
+            class: org.apache.nifi.minifi.processors.GenerateFlowFile
+          - id: 00000000-0000-0000-0000-000000000002
+            name: LogAttribute
+            class: org.apache.nifi.minifi.processors.LogAttribute
+          Funnels:
+          - id: 00000000-0000-0000-0000-000000000003
+          - id: 99999999-9999-9999-9999-999999999999
+          Connections:
+          - id: 00000000-0000-0000-0000-000000000004
+            name: 00000000-0000-0000-0000-000000000003//LogAttribute
+            source id: 00000000-0000-0000-0000-000000000003
+            source relationship names: []
+            destination id: 00000000-0000-0000-0000-000000000002
+          - id: 00000000-0000-0000-0000-000000000005
+            name: GenerateFlowFile1/success/00000000-0000-0000-0000-000000000003
+            source id: 00000000-0000-0000-0000-000000000001
+            source relationship names:
+            - success
+            destination id: 00000000-0000-0000-0000-000000000003
+          Remote Process Groups:
+          - id: 00000000-0000-0000-0000-000000000006
+            name: ''
+            url: http://localhost:8080/nifi
+            transport protocol: RAW
+            Input Ports:
+            - id: 00000000-0000-0000-0000-000000000007
+              name: test2
+              max concurrent tasks: 1
+              use compression: false
+            Output Ports: []
+          Controller Services:
+            - name: SSLContextService
+              id: 00000000-0000-0000-0000-000000000008
+              class: SSLContextService
+            )";
+
+      auto config_old = config_yaml;
+      utils::StringUtils::replaceAll(config_yaml, std::string("00000000-0000-0000-0000-00000000000") + i, "99999999-9999-9999-9999-999999999999");
+      std::istringstream config_yaml_stream(config_yaml);
+      REQUIRE_THROWS_WITH(yaml_config.getYamlRoot(config_yaml_stream), "General Operation: UUID 99999999-9999-9999-9999-999999999999 is duplicated in the flow configuration");
+    }
+  }
+}
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h b/libminifi/include/core/yaml/CheckRequiredField.h
index e6e1b500f..0967a723b 100644
--- a/libminifi/include/core/yaml/CheckRequiredField.h
+++ b/libminifi/include/core/yaml/CheckRequiredField.h
@@ -53,9 +53,9 @@ std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std
  *                               not present in 'yaml_node'
  */
 void checkRequiredField(
-    const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section = "", std::string error_message = "");
+    const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section = "", std::string_view error_message = "");
 
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string error_message = {});
+std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message = {});
 
 }  // namespace yaml
 }  // namespace core
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index dd6e163ba..0430092c3 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <optional>
 #include <string>
+#include <unordered_set>
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -277,6 +278,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return         the parsed or generated UUID string
    */
   std::string getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField = "id");
+  std::string getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section = "", std::string_view error_message = "");
 
   /**
    * This is a helper function for getting an optional value, if it exists.
@@ -302,9 +304,11 @@ class YamlConfiguration : public FlowConfiguration {
   void parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
   void parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
   void parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
+  void addNewId(const std::string& uuid);
 
   std::shared_ptr<logging::Logger> logger_;
   static std::shared_ptr<utils::IdGenerator> id_generator_;
+  std::unordered_set<std::string> uuids_;
 
   /**
    * Raises a human-readable configuration error for the given configuration component/section.
diff --git a/libminifi/src/core/yaml/CheckRequiredField.cpp b/libminifi/src/core/yaml/CheckRequiredField.cpp
index 493e301c5..bb5b7abf8 100644
--- a/libminifi/src/core/yaml/CheckRequiredField.cpp
+++ b/libminifi/src/core/yaml/CheckRequiredField.cpp
@@ -51,25 +51,25 @@ std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std
   return err_msg;
 }
 
-void checkRequiredField(const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section, std::string error_message) {
+void checkRequiredField(const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section, std::string_view error_message) {
   if (!isFieldPresent(yaml_node, field_name)) {
     if (error_message.empty()) {
-      error_message = buildErrorMessage(yaml_node, std::vector<std::string>{std::string(field_name)}, yaml_section);
+      throw std::invalid_argument(buildErrorMessage(yaml_node, std::vector<std::string>{std::string(field_name)}, yaml_section));
     }
-    throw std::invalid_argument(error_message);
+    throw std::invalid_argument(error_message.data());
   }
 }
 
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string error_message) {
+std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message) {
   for (const auto& name : alternate_names) {
     if (yaml::isFieldPresent(yaml_node, name)) {
       return yaml_node[name].as<std::string>();
     }
   }
   if (error_message.empty()) {
-    error_message = buildErrorMessage(yaml_node, alternate_names, yaml_section);
+    throw std::invalid_argument(buildErrorMessage(yaml_node, alternate_names, yaml_section));
   }
-  throw std::invalid_argument(error_message);
+  throw std::invalid_argument(error_message.data());
 }
 
 }  // namespace yaml
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 299c7c8f9..6dc6537ab 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -124,23 +124,24 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(con
 }
 
 std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
-    YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
-    YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+  uuids_.clear();
+  YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+  YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
 
-    parseControllerServices(controllerServiceNode);
-    // Create the root process group
-    std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
-    parseProvenanceReportingYaml(provenanceReportNode, root.get());
+  parseControllerServices(controllerServiceNode);
+  // Create the root process group
+  std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
+  parseProvenanceReportingYaml(provenanceReportNode, root.get());
 
-    // set the controller services into the root group.
-    for (const auto& controller_service : controller_services_->getAllControllerServices()) {
-      root->addControllerService(controller_service->getName(), controller_service);
-      root->addControllerService(controller_service->getUUIDStr(), controller_service);
-    }
-
-    return root;
+  // set the controller services into the root group.
+  for (const auto& controller_service : controller_services_->getAllControllerServices()) {
+    root->addControllerService(controller_service->getName(), controller_service);
+    root->addControllerService(controller_service->getUUIDStr(), controller_service);
   }
 
+  return root;
+}
+
 void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
   int64_t runDurationNanos = -1;
   utils::Identifier uuid;
@@ -505,7 +506,6 @@ void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServ
     const auto controllerServiceNode = iter.as<YAML::Node>();
     try {
       yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-      yaml::checkRequiredField(controllerServiceNode, "id", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
 
       auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
       logger_->log_debug("Using type %s for controller service node", type);
@@ -518,7 +518,7 @@ void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServ
       }
 
       auto name = controllerServiceNode["name"].as<std::string>();
-      auto id = controllerServiceNode["id"].as<std::string>();
+      auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
 
       utils::Identifier uuid;
       uuid = id;
@@ -595,13 +595,12 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
   // Check for required fields
   yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
-  yaml::checkRequiredField(inputPortsObj, "id", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
-                     "The field 'id' is required for "
-                         "the port named '" + nameStr + "' in the YAML Config. If this port "
-                         "is an input port for a NiFi Remote Process Group, the port "
-                         "id should match the corresponding id specified in the NiFi configuration. "
-                         "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
-  auto portId = inputPortsObj["id"].as<std::string>();
+  auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
+    "The field 'id' is required for "
+    "the port named '" + nameStr + "' in the YAML Config. If this port "
+    "is an input port for a NiFi Remote Process Group, the port "
+    "id should match the corresponding id specified in the NiFi configuration. "
+    "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
   uuid = portId;
 
   auto port = std::make_unique<minifi::RemoteProcessorGroupPort>(
@@ -873,14 +872,21 @@ std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const
   if (node[idField]) {
     if (YAML::NodeType::Scalar == node[idField].Type()) {
       id = node[idField].as<std::string>();
-    } else {
-      throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node "
-                                  "of YAML::NodeType::Scalar.");
+      addNewId(id);
+      return id;
     }
-  } else {
-    id = id_generator_->generate().to_string();
-    logger_->log_debug("Generating random ID: id => [%s]", id);
+    throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar.");
   }
+
+  id = id_generator_->generate().to_string();
+  logger_->log_debug("Generating random ID: id => [%s]", id);
+  return id;
+}
+
+std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) {
+  yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message);
+  auto id = yaml_node["id"].as<std::string>();
+  addNewId(id);
   return id;
 }
 
@@ -908,6 +914,13 @@ YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const
   return result;
 }
 
+void YamlConfiguration::addNewId(const std::string& uuid) {
+  const auto [_, success] = uuids_.insert(uuid);
+  if (!success) {
+    throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration");
+  }
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/test/resources/TestHTTPSiteToSite.yml b/libminifi/test/resources/TestHTTPSiteToSite.yml
index 339d8c6d2..5b6baf63c 100644
--- a/libminifi/test/resources/TestHTTPSiteToSite.yml
+++ b/libminifi/test/resources/TestHTTPSiteToSite.yml
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 Flow Controller:
-    id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
+    id: 471deef6-2a6e-4a7d-912a-81cc17e3a209
     name: MiNiFi Flow
 
 Processors:
@@ -47,7 +47,7 @@ Processors:
 Connections:
     - name: GenerateFlowFileS2S
       id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
-      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
       source relationship name: success
       destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
       max work queue size: 0
@@ -55,8 +55,8 @@ Connections:
       flowfile expiration: 60 sec
       queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
     - name: GenerateFlowFileS2S
-      id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
-      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a203 
+      id: 471deef6-2a6e-4a7d-912a-81cc17e3a210
+      source id: 471deef6-2a6e-4a7d-912a-81cc17e3a203
       source relationship name: success
       destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
       max work queue size: 0


[nifi-minifi-cpp] 04/04: MINIFICPP-1823 Fix absolute.path output attribute in ListFile

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4a98b0f620d4d08adfcfc4b59f1122e205ffc7cc
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Thu May 5 14:55:33 2022 +0200

    MINIFICPP-1823 Fix absolute.path output attribute in ListFile
    
    Closes #1326
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../features/file_system_operations.feature        | 15 +++++++-
 .../integration/minifi/processors/FetchFile.py     | 25 ++++++++++++
 .../test/integration/minifi/processors/ListFile.py | 25 ++++++++++++
 .../standard-processors/processors/ListFile.cpp    | 45 +++++++++++-----------
 .../standard-processors/processors/ListFile.h      |  1 +
 .../tests/unit/ListFileTests.cpp                   | 15 ++++++--
 6 files changed, 98 insertions(+), 28 deletions(-)

diff --git a/docker/test/integration/features/file_system_operations.feature b/docker/test/integration/features/file_system_operations.feature
index f01922b40..5b157284a 100644
--- a/docker/test/integration/features/file_system_operations.feature
+++ b/docker/test/integration/features/file_system_operations.feature
@@ -1,7 +1,7 @@
-Feature: File system operations are handled by the GetFile and PutFile processors
+Feature: File system operations are handled by the GetFile, PutFile, ListFile and FetchFile processors
   In order to store and access data on the local file system
   As a user of MiNiFi
-  I need to have GetFile and PutFile processors
+  I need to have GetFile, PutFile, ListFile and FetchFile processors
 
   Background:
     Given the content of "/tmp/output" is monitored
@@ -38,3 +38,14 @@ Feature: File system operations are handled by the GetFile and PutFile processor
     And a file with the content "test" is present in "/tmp/input"
     When the MiNiFi instance starts up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
+
+  Scenario: List and fetch files from a directory in a simple flow
+    Given a file with filename "test_file.log" and content "Test message" is present in "/tmp/input"
+    And a file with filename "test_file2.log" and content "Another test message" is present in "/tmp/input"
+    And a ListFile processor with the "Input Directory" property set to "/tmp/input"
+    And a FetchFile processor
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListFile processor is connected to the FetchFile
+    And the "success" relationship of the FetchFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then two flowfiles with the contents "Test message" and "Another test message" are placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/minifi/processors/FetchFile.py b/docker/test/integration/minifi/processors/FetchFile.py
new file mode 100644
index 000000000..24abf6db3
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class FetchFile(Processor):
+    def __init__(self):
+        super(FetchFile, self).__init__(
+            'FetchFile',
+            schedule={"scheduling strategy": "EVENT_DRIVEN"},
+            auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/ListFile.py b/docker/test/integration/minifi/processors/ListFile.py
new file mode 100644
index 000000000..5e0d9e6c6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListFile.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class ListFile(Processor):
+    def __init__(self, schedule={'scheduling period': '2 sec'}):
+        super(ListFile, self).__init__(
+            'ListFile',
+            schedule=schedule,
+            auto_terminate=['success'])
diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp
index 6387976d3..555e79822 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -142,39 +142,39 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
 }
 
 bool ListFile::fileMatchesFilters(const ListedFile& listed_file) {
-  if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.absolute_path)) {
-    logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.absolute_path);
+  if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.full_file_path)) {
+    logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (file_filter_ && !std::regex_match(listed_file.filename, *file_filter_)) {
-    logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (path_filter_ && listed_file.relative_path != "." && !std::regex_match(listed_file.relative_path, *path_filter_)) {
-    logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.absolute_path);
+    logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.full_file_path);
     return false;
   }
 
   auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified());
   if (minimum_file_age_ && file_age < *minimum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (maximum_file_age_ && file_age > *maximum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) {
-    logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
   if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) {
-    logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.absolute_path);
+    logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path);
     return false;
   }
 
@@ -188,32 +188,32 @@ std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& s
   session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, listed_file.relative_path == "." ?
     std::string(".") + utils::file::FileUtils::get_separator() : listed_file.relative_path + utils::file::FileUtils::get_separator());
   session.putAttribute(flow_file, "file.size", std::to_string(listed_file.file_size));
-  if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.absolute_path, "%Y-%m-%dT%H:%M:%SZ")) {
+  if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.full_file_path, "%Y-%m-%dT%H:%M:%SZ")) {
     session.putAttribute(flow_file, "file.lastModifiedTime", *last_modified_str);
   } else {
     session.putAttribute(flow_file, "file.lastModifiedTime", "");
-    logger_->log_warn("Could not get last modification time of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path);
   }
 
-  if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.absolute_path)) {
+  if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.permissions", *permission_string);
   } else {
-    logger_->log_warn("Failed to get permissions of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get permissions of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.permissions", "");
   }
 
-  if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.absolute_path)) {
+  if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.owner", *owner);
   } else {
-    logger_->log_warn("Failed to get owner of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get owner of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.owner", "");
   }
 
 #ifndef WIN32
-  if (auto group = utils::file::FileUtils::get_file_group(listed_file.absolute_path)) {
+  if (auto group = utils::file::FileUtils::get_file_group(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.group", *group);
   } else {
-    logger_->log_warn("Failed to get group of file '%s'", listed_file.absolute_path);
+    logger_->log_warn("Failed to get group of file '%s'", listed_file.full_file_path);
     session.putAttribute(flow_file, "file.group", "");
   }
 #else
@@ -234,18 +234,19 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
   auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, logger_, recurse_subdirectories_);
   for (const auto& [path, filename] : file_list) {
     ListedFile listed_file;
-    listed_file.absolute_path = (std::filesystem::path(path) / filename).string();
+    listed_file.full_file_path = (std::filesystem::path(path) / filename).string();
+    listed_file.absolute_path = path + utils::file::FileUtils::get_separator();
     if (auto relative_path = utils::file::FileUtils::get_relative_path(path, input_directory_)) {
       listed_file.relative_path = *relative_path;
     } else {
-      logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.absolute_path, input_directory_);
+      logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.full_file_path, input_directory_);
     }
-    listed_file.file_size = utils::file::FileUtils::file_size(listed_file.absolute_path);
+    listed_file.file_size = utils::file::FileUtils::file_size(listed_file.full_file_path);
     listed_file.filename = filename;
-    if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.absolute_path)) {
+    if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.full_file_path)) {
       listed_file.last_modified_time = *last_modified_time;
     } else {
-      logger_->log_error("Could not get last modification time of file '%s'", listed_file.absolute_path);
+      logger_->log_error("Could not get last modification time of file '%s'", listed_file.full_file_path);
       continue;
     }
 
@@ -254,7 +255,7 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
     }
 
     if (stored_listing_state.wasObjectListedAlready(listed_file)) {
-      logger_->log_debug("File '%s' was already listed.", listed_file.absolute_path);
+      logger_->log_debug("File '%s' was already listed.", listed_file.full_file_path);
       continue;
     }
 
diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h
index 83b3d8606..4223126c6 100644
--- a/extensions/standard-processors/processors/ListFile.h
+++ b/extensions/standard-processors/processors/ListFile.h
@@ -71,6 +71,7 @@ class ListFile : public core::Processor {
     std::string absolute_path;
     std::filesystem::file_time_type last_modified_time;
     std::string relative_path;
+    std::string full_file_path;
     uint64_t file_size = 0;
   };
 
diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp
index 06314f53b..cfef64bbf 100644
--- a/extensions/standard-processors/tests/unit/ListFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp
@@ -26,6 +26,7 @@
 #include "processors/ListFile.h"
 #include "utils/TestUtils.h"
 #include "utils/IntegrationTestUtils.h"
+#include "utils/file/PathUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -100,10 +101,16 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files only once with default
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt"));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt"));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + empty_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + standard_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + first_sub_file_abs_path_));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + second_sub_file_abs_path_));
+  std::string file_path;
+  std::string file_name;
+  utils::file::getFileNameAndPath(empty_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(standard_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(first_sub_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
+  utils::file::getFileNameAndPath(second_sub_file_abs_path_, file_path, file_name);
+  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + file_path + utils::file::FileUtils::get_separator() + "\n"));
   REQUIRE(LogTestController::getInstance().countOccurrences(std::string("key:path value:.") + utils::file::FileUtils::get_separator() + "\n") == 2);
   REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:first_subdir") + utils::file::FileUtils::get_separator()));
   REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:second_subdir") + utils::file::FileUtils::get_separator()));