You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/01/04 15:28:22 UTC

[nifi-minifi-cpp] 02/05: MINIFICPP-1290 Create test coverage for OPC processors

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c4769dd2e862f4716b00359a1a78cbd743afc7b9
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Jan 4 15:57:58 2022 +0100

    MINIFICPP-1290 Create test coverage for OPC processors
    
    - Upgraded open6241 library to version 1.2.2 and included the required fixes
    - Added integration tests for put and fetch OPC processors with no security, secure connection and authentication.
    
    Note: The docker image used for testing is currently a private lordgamez/open62541 image until the changes included in that image are merged and the new image is uploaded in the official open62541 docker repository. The PR for the changes can be checked here: https://github.com/open62541/open62541/pull/4722
    
    Closes #1216
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .github/workflows/ci.yml                           |   2 +-
 PROCESSORS.md                                      |   4 +-
 cmake/BundledOpen62541.cmake                       |   4 +-
 .../integration/MiNiFi_integration_test_driver.py  |  21 +--
 docker/test/integration/features/opcua.feature     | 155 +++++++++++++++++++++
 .../minifi/core/AzureStorageServerContainer.py     |   7 +-
 docker/test/integration/minifi/core/Container.py   |   3 +-
 .../minifi/core/DockerTestDirectoryBindings.py     |   1 +
 .../integration/minifi/core/FileSystemObserver.py  |  37 ++---
 .../test/integration/minifi/core/FlowContainer.py  |   4 +-
 .../integration/minifi/core/HttpProxyContainer.py  |   7 +-
 .../minifi/core/KafkaBrokerContainer.py            |   7 +-
 .../integration/minifi/core/MinifiContainer.py     |   8 +-
 .../integration/minifi/core/MqttBrokerContainer.py |   7 +-
 .../test/integration/minifi/core/NifiContainer.py  |  13 +-
 .../minifi/core/OPCUAServerContainer.py            |  24 ++++
 .../minifi/core/PostgreSQLServerContainer.py       |   7 +-
 .../integration/minifi/core/S3ServerContainer.py   |   7 +-
 .../minifi/core/SingleNodeDockerCluster.py         |  23 +--
 .../integration/minifi/core/ZookeeperContainer.py  |   7 +-
 .../minifi/processors/FetchOPCProcessor.py         |   8 ++
 .../minifi/processors/PutOPCProcessor.py           |   9 ++
 .../resources/opcua/opcua_client_cert.der          | Bin 0 -> 1018 bytes
 .../resources/opcua/opcua_client_key.der           | Bin 0 -> 1192 bytes
 docker/test/integration/steps/steps.py             |  38 ++++-
 extensions/opc/include/fetchopc.h                  |   4 +
 extensions/opc/include/opc.h                       |   4 +-
 extensions/opc/include/putopc.h                    |   4 +
 extensions/opc/src/opc.cpp                         |  68 ++++-----
 extensions/opc/src/opcbase.cpp                     |  61 ++++----
 extensions/opc/src/putopc.cpp                      |  16 +--
 thirdparty/open62541/open62541.patch               |  50 +++----
 32 files changed, 419 insertions(+), 191 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a1dff22..a933950 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -201,7 +201,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_OPC=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/PROCESSORS.md b/PROCESSORS.md
index fb796af..2445d48 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -523,7 +523,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 |**Node ID**|||Specifies the ID of the root node to traverse|
 |**Node ID type**||Int<br>Path<br>String<br>|Specifies the type of the provided node ID|
 |**OPC server endpoint**|||Specifies the address, port and relative path of an OPC endpoint|
-|Password|||Password to log in with. Providing this requires cert and key to be provided as well, credentials are always sent encrypted.|
+|Password|||Password to log in with.|
 |Trusted server certificate path|||Path to the DER-encoded trusted server certificate|
 |Username|||Username to log in with.|
 ### Relationships
@@ -1369,7 +1369,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 |**Parent node ID**|||Specifies the ID of the root node to traverse|
 |**Parent node ID type**||Int<br>Path<br>String<br>|Specifies the type of the provided node ID|
 |Parent node namespace index|0||The index of the namespace. Used only if node ID type is not path.|
-|Password|||Password to log in with. Providing this requires cert and key to be provided as well, credentials are always sent encrypted.|
+|Password|||Password to log in with.|
 |Target node ID|||ID of target node.<br/>**Supports Expression Language: true**|
 |Target node ID type|||ID type of target node. Allowed values are: Int, String.<br/>**Supports Expression Language: true**|
 |Target node browse name|||Browse name of target node. Only used when new node is created.<br/>**Supports Expression Language: true**|
diff --git a/cmake/BundledOpen62541.cmake b/cmake/BundledOpen62541.cmake
index e055d39..c03c0bc 100644
--- a/cmake/BundledOpen62541.cmake
+++ b/cmake/BundledOpen62541.cmake
@@ -40,8 +40,8 @@ function(use_bundled_open62541 SOURCE_DIR BINARY_DIR)
     # Build project
     ExternalProject_Add(
             open62541-external
-            URL "https://github.com/open62541/open62541/archive/v1.0.tar.gz"
-            URL_HASH "SHA256=9be66efefe2cdb07a7638aad91c301b5c6163f99c66995bc41cce31ec0ea207e"
+            URL "https://github.com/open62541/open62541/archive/v1.2.2.tar.gz"
+            URL_HASH "SHA256=9b5bfd811ee523be601f11abc514a93c67fe5c6e957cd6c36fe6ea4f28e009bb"
             SOURCE_DIR "${BINARY_DIR}/thirdparty/open62541-src"
             PATCH_COMMAND ${PC}
             LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index b7e5991..34bc202 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -44,8 +44,8 @@ class MiNiFi_integration_test():
     def docker_path_to_local_path(self, docker_path):
         return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
 
-    def acquire_container(self, name, engine='minifi-cpp'):
-        return self.cluster.acquire_container(name, engine)
+    def acquire_container(self, name, engine='minifi-cpp', command=None):
+        return self.cluster.acquire_container(name, engine, command)
 
     def wait_for_container_startup_to_finish(self, container_name):
         startup_success = self.cluster.wait_for_startup_log(container_name, 120)
@@ -141,7 +141,8 @@ class MiNiFi_integration_test():
     def check_for_at_least_one_file_with_content_generated(self, content, timeout_seconds):
         output_validator = SingleOrMultiFileOutputValidator(decode_escaped_str(content))
         output_validator.set_output_dir(self.file_system_observer.get_output_dir())
-        self.check_output(timeout_seconds, output_validator, 1)
+        expected_number_of_files = timeout_seconds
+        self.check_output(timeout_seconds, output_validator, expected_number_of_files)
 
     def check_for_num_files_generated(self, num_flowfiles, timeout_seconds):
         output_validator = NoContentCheckFileNumberValidator(num_flowfiles)
@@ -163,7 +164,8 @@ class MiNiFi_integration_test():
         self.validate(output_validator)
 
     def check_output(self, timeout_seconds, output_validator, max_files):
-        self.file_system_observer.wait_for_output(timeout_seconds, max_files)
+        if self.file_system_observer.wait_for_output(timeout_seconds, max_files, output_validator):
+            return
         self.validate(output_validator)
 
     def validate(self, validator):
@@ -190,17 +192,20 @@ class MiNiFi_integration_test():
         assert self.cluster.wait_for_kafka_consumer_to_be_registered(kafka_container_name)
 
     def check_minifi_log_contents(self, line, timeout_seconds=60):
+        self.check_container_log_contents("minifi-cpp", line, timeout_seconds)
+
+    def check_minifi_log_matches_regex(self, regex, timeout_seconds=60):
         for container in self.cluster.containers.values():
             if container.get_engine() == "minifi-cpp":
-                line_found = self.cluster.wait_for_app_logs(container.get_name(), line, timeout_seconds)
+                line_found = self.cluster.wait_for_app_logs_regex(container.get_name(), regex, timeout_seconds)
                 if line_found:
                     return
         assert False
 
-    def check_minifi_log_matches_regex(self, regex, timeout_seconds=60):
+    def check_container_log_contents(self, container_engine, line, timeout_seconds=60):
         for container in self.cluster.containers.values():
-            if container.get_engine() == "minifi-cpp":
-                line_found = self.cluster.wait_for_app_logs_regex(container.get_name(), regex, timeout_seconds)
+            if container.get_engine() == container_engine:
+                line_found = self.cluster.wait_for_app_logs(container.get_name(), line, timeout_seconds)
                 if line_found:
                     return
         assert False
diff --git a/docker/test/integration/features/opcua.feature b/docker/test/integration/features/opcua.feature
new file mode 100644
index 0000000..8e44aee
--- /dev/null
+++ b/docker/test/integration/features/opcua.feature
@@ -0,0 +1,155 @@
+Feature: Putting and fetching data to OPC UA server
+  In order to send and fetch data from an OPC UA server
+  As a user of MiNiFi
+  I need to have PutOPCProcessor and FetchOPCProcessor
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario Outline: Create and fetch data from an OPC UA node
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "create-opc-ua-node" flow
+    And a file with the content "<Value>" is present in "/tmp/input"
+    And a PutOPCProcessor processor in the "create-opc-ua-node" flow
+    And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow
+    And these processor properties are set:
+      | processor name    | property name               | property value                  |
+      | PutOPCProcessor   | Parent node ID              | 85                              |
+      | PutOPCProcessor   | Parent node ID type         | Int                             |
+      | PutOPCProcessor   | Target node ID              | 9999                            |
+      | PutOPCProcessor   | Target node ID type         | Int                             |
+      | PutOPCProcessor   | Target node namespace index | 1                               |
+      | PutOPCProcessor   | Value type                  | <Value Type>                    |
+      | PutOPCProcessor   | OPC server endpoint         | opc.tcp://opcua-server:4840/    |
+      | PutOPCProcessor   | Target node browse name     | testnodename                    |
+      | FetchOPCProcessor | Node ID                     | 9999                            |
+      | FetchOPCProcessor | Node ID type                | Int                             |
+      | FetchOPCProcessor | Namespace index             | 1                               |
+      | FetchOPCProcessor | OPC server endpoint         | opc.tcp://opcua-server:4840/    |
+      | FetchOPCProcessor | Max depth                   | 1                               |
+
+    And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor
+    And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile
+
+    And an OPC UA server is set up
+
+    When all instances start up
+    Then at least one flowfile with the content "<Value>" is placed in the monitored directory in less than 60 seconds
+
+  Examples: Topic names and formats to test
+    | Value Type   | Value   |
+    | String       | Test    |
+    | UInt32       | 42      |
+    | Double       | 123.321 |
+    | Boolean      | False   |
+
+  Scenario Outline: Update and fetch data from an OPC UA node
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "update-opc-ua-node" flow
+    And a file with the content "<Value>" is present in "/tmp/input"
+    And a PutOPCProcessor processor in the "update-opc-ua-node" flow
+    And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow
+    And these processor properties are set:
+      | processor name    | property name               | property value                  |
+      | PutOPCProcessor   | Parent node ID              | 85                              |
+      | PutOPCProcessor   | Parent node ID type         | Int                             |
+      | PutOPCProcessor   | Target node ID              | <Node ID>                       |
+      | PutOPCProcessor   | Target node ID type         | <Node ID Type>                  |
+      | PutOPCProcessor   | Target node namespace index | 1                               |
+      | PutOPCProcessor   | Value type                  | <Value Type>                    |
+      | PutOPCProcessor   | OPC server endpoint         | opc.tcp://opcua-server:4840/    |
+      | PutOPCProcessor   | Target node browse name     | testnodename                    |
+      | FetchOPCProcessor | Node ID                     | <Node ID>                       |
+      | FetchOPCProcessor | Node ID type                | <Node ID Type>                  |
+      | FetchOPCProcessor | Namespace index             | 1                               |
+      | FetchOPCProcessor | OPC server endpoint         | opc.tcp://opcua-server:4840/    |
+      | FetchOPCProcessor | Max depth                   | 1                               |
+
+    And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor
+    And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile
+
+    And an OPC UA server is set up
+
+    When all instances start up
+    Then at least one flowfile with the content "<Value>" is placed in the monitored directory in less than 60 seconds
+
+  # Node ids starting from 51000 are pre-defined demo node ids in the test server application (server_ctt) of the open62541 docker image. There is one nodeid defined
+  # for each type supported by OPC UA. These demo nodes can be used for testing purposes. "the.answer" is also a pre-defined string id for the same testing purposes.
+  Examples: Topic names and formats to test
+    | Node ID Type | Node ID     | Value Type   | Value       |
+    | Int          | 51034       | String       | minifi-test |
+    | Int          | 51001       | Boolean      | True        |
+    | String       | the.answer  | Int32        | 54          |
+    | Int          | 51019       | UInt32       | 123         |
+    | Int          | 51031       | Double       | 66.6        |
+
+  Scenario: Create and fetch data from an OPC UA node through secure connection
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "create-opc-ua-node" flow
+    And a file with the content "Test" is present in "/tmp/input"
+    And a PutOPCProcessor processor in the "create-opc-ua-node" flow
+    And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow
+    And these processor properties are set:
+      | processor name    | property name               | property value                             |
+      | PutOPCProcessor   | Parent node ID              | 85                                         |
+      | PutOPCProcessor   | Parent node ID type         | Int                                        |
+      | PutOPCProcessor   | Target node ID              | 9999                                       |
+      | PutOPCProcessor   | Target node ID type         | Int                                        |
+      | PutOPCProcessor   | Target node namespace index | 1                                          |
+      | PutOPCProcessor   | Value type                  | String                                     |
+      | PutOPCProcessor   | OPC server endpoint         | opc.tcp://opcua-server:4840/               |
+      | PutOPCProcessor   | Target node browse name     | testnodename                               |
+      | PutOPCProcessor   | Certificate path            | /tmp/resources/opcua/opcua_client_cert.der |
+      | PutOPCProcessor   | Key path                    | /tmp/resources/opcua/opcua_client_key.der  |
+      | PutOPCProcessor   | Application URI             | urn:open62541.server.application           |
+      | FetchOPCProcessor | Node ID                     | 9999                                       |
+      | FetchOPCProcessor | Node ID type                | Int                                        |
+      | FetchOPCProcessor | Namespace index             | 1                                          |
+      | FetchOPCProcessor | OPC server endpoint         | opc.tcp://opcua-server:4840/               |
+      | FetchOPCProcessor | Max depth                   | 1                                          |
+      | FetchOPCProcessor | Certificate path            | /tmp/resources/opcua/opcua_client_cert.der |
+      | FetchOPCProcessor | Key path                    | /tmp/resources/opcua/opcua_client_key.der  |
+      | FetchOPCProcessor | Application URI             | urn:open62541.server.application           |
+
+    And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor
+    And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile
+
+    And an OPC UA server is set up
+
+    When all instances start up
+    Then at least one flowfile with the content "Test" is placed in the monitored directory in less than 60 seconds
+    And the OPC UA server logs contain the following message: "SecureChannel opened with SecurityPolicy http://opcfoundation.org/UA/SecurityPolicy#Aes128_Sha256_RsaOaep" in less than 5 seconds
+
+  Scenario: Create and fetch data from an OPC UA node through username and password authenticated connection
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "create-opc-ua-node" flow
+    And a file with the content "Test" is present in "/tmp/input"
+    And a PutOPCProcessor processor in the "create-opc-ua-node" flow
+    And a FetchOPCProcessor processor in the "fetch-opc-ua-node" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "fetch-opc-ua-node" flow
+    And these processor properties are set:
+      | processor name    | property name               | property value                |
+      | PutOPCProcessor   | Parent node ID              | 85                            |
+      | PutOPCProcessor   | Parent node ID type         | Int                           |
+      | PutOPCProcessor   | Target node ID              | 9999                          |
+      | PutOPCProcessor   | Target node ID type         | Int                           |
+      | PutOPCProcessor   | Target node namespace index | 1                             |
+      | PutOPCProcessor   | Value type                  | String                        |
+      | PutOPCProcessor   | OPC server endpoint         | opc.tcp://opcua-server:4840/  |
+      | PutOPCProcessor   | Target node browse name     | testnodename                  |
+      | PutOPCProcessor   | Username                    | peter                         |
+      | PutOPCProcessor   | Password                    | peter123                      |
+      | FetchOPCProcessor | Node ID                     | 9999                          |
+      | FetchOPCProcessor | Node ID type                | Int                           |
+      | FetchOPCProcessor | Namespace index             | 1                             |
+      | FetchOPCProcessor | OPC server endpoint         | opc.tcp://opcua-server:4840/  |
+      | FetchOPCProcessor | Max depth                   | 1                             |
+      | FetchOPCProcessor | Username                    | peter                         |
+      | FetchOPCProcessor | Password                    | peter123                      |
+
+    And the "success" relationship of the GetFile processor is connected to the PutOPCProcessor
+    And the "success" relationship of the FetchOPCProcessor processor is connected to the PutFile
+
+    And an OPC UA server is set up with access control
+
+    When all instances start up
+    Then at least one flowfile with the content "Test" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/AzureStorageServerContainer.py b/docker/test/integration/minifi/core/AzureStorageServerContainer.py
index 6e8645d..798a04d 100644
--- a/docker/test/integration/minifi/core/AzureStorageServerContainer.py
+++ b/docker/test/integration/minifi/core/AzureStorageServerContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class AzureStorageServerContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'azure-storage-server', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'azure-storage-server', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Azurite Queue service is successfully listening at"
@@ -19,5 +19,6 @@ class AzureStorageServerContainer(Container):
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'10000/tcp': 10000, '10001/tcp': 10001})
+            ports={'10000/tcp': 10000, '10001/tcp': 10001},
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/Container.py b/docker/test/integration/minifi/core/Container.py
index 8e0b41c..1457230 100644
--- a/docker/test/integration/minifi/core/Container.py
+++ b/docker/test/integration/minifi/core/Container.py
@@ -3,12 +3,13 @@ import logging
 
 
 class Container:
-    def __init__(self, name, engine, vols, network, image_store):
+    def __init__(self, name, engine, vols, network, image_store, command):
         self.name = name
         self.engine = engine
         self.vols = vols
         self.network = network
         self.image_store = image_store
+        self.command = command
 
         # Get docker client
         self.client = docker.from_env()
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 1aed63a..1673327 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -25,6 +25,7 @@ class DockerTestDirectoryBindings:
         test_dir = os.environ['TEST_DIRECTORY']  # Based on DockerVerify.sh
         shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
         shutil.copytree(test_dir + "/resources/python", self.data_directories[test_id]["resources_dir"] + "/python")
+        shutil.copytree(test_dir + "/resources/opcua", self.data_directories[test_id]["resources_dir"] + "/opcua")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/minifi/core/FileSystemObserver.py
index 0411bf5..329acc6 100644
--- a/docker/test/integration/minifi/core/FileSystemObserver.py
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -31,21 +31,26 @@ class FileSystemObserver(object):
         self.observer.schedule(self.event_handler, self.test_output_dir, recursive=True)
         self.observer.start()
 
-    def wait_for_output(self, timeout_seconds, max_files):
+    def wait_for_output(self, timeout_seconds, max_files, output_validator):
         logging.info('Waiting up to %d seconds for %d test outputs...', timeout_seconds, max_files)
         self.restart_observer_if_needed()
-        if max_files <= self.event_handler.get_num_files_created():
-            return
-        wait_start_time = time.perf_counter()
-        for _ in range(0, max_files):
-            # Note: The timing on Event.wait() is inaccurate
-            self.done_event.wait(timeout_seconds - time.perf_counter() + wait_start_time)
-            if self.done_event.isSet():
-                self.done_event.clear()
-                if max_files <= self.event_handler.get_num_files_created():
-                    self.done_event.set()
-                    return
-            if timeout_seconds < (time.perf_counter() - wait_start_time):
-                break
-        self.observer.stop()
-        self.observer.join()
+        try:
+            if max_files <= self.event_handler.get_num_files_created():
+                return False
+            wait_start_time = time.perf_counter()
+            for _ in range(0, max_files):
+                # Note: The timing on Event.wait() is inaccurate
+                self.done_event.wait(timeout_seconds - time.perf_counter() + wait_start_time)
+                if self.done_event.isSet():
+                    self.done_event.clear()
+                    if max_files <= self.event_handler.get_num_files_created():
+                        self.done_event.set()
+                        return False
+                    if output_validator.validate():
+                        return True
+                if timeout_seconds < (time.perf_counter() - wait_start_time):
+                    break
+            return False
+        finally:
+            self.observer.stop()
+            self.observer.join()
diff --git a/docker/test/integration/minifi/core/FlowContainer.py b/docker/test/integration/minifi/core/FlowContainer.py
index f8dfc23..d8eaffa 100644
--- a/docker/test/integration/minifi/core/FlowContainer.py
+++ b/docker/test/integration/minifi/core/FlowContainer.py
@@ -2,8 +2,8 @@ from .Container import Container
 
 
 class FlowContainer(Container):
-    def __init__(self, config_dir, name, engine, vols, network, image_store):
-        super().__init__(name, engine, vols, network, image_store)
+    def __init__(self, config_dir, name, engine, vols, network, image_store, command):
+        super().__init__(name, engine, vols, network, image_store, command)
         self.start_nodes = []
         self.config_dir = config_dir
 
diff --git a/docker/test/integration/minifi/core/HttpProxyContainer.py b/docker/test/integration/minifi/core/HttpProxyContainer.py
index 98fe2a9..79e5872 100644
--- a/docker/test/integration/minifi/core/HttpProxyContainer.py
+++ b/docker/test/integration/minifi/core/HttpProxyContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class HttpProxyContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'http-proxy', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'http-proxy', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Accepting HTTP Socket connections at"
@@ -19,5 +19,6 @@ class HttpProxyContainer(Container):
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'3128/tcp': 3128})
+            ports={'3128/tcp': 3128},
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/KafkaBrokerContainer.py b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
index 64346c3..fcc6647 100644
--- a/docker/test/integration/minifi/core/KafkaBrokerContainer.py
+++ b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class KafkaBrokerContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'kafka-broker', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'kafka-broker', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Kafka startTimeMs"
@@ -30,5 +30,6 @@ class KafkaBrokerContainer(Container):
                 "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093,SASL_PLAINTEXT://kafka-broker:9094,SASL_PLAINTEXT_HOST://localhost:29094,SASL_SSL://kafka-broker:9095,SASL_SSL_HOST://localhost:29095",
                 "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
                 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
-                "SSL_CLIENT_AUTH=none"])
+                "SSL_CLIENT_AUTH=none"],
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/MinifiContainer.py b/docker/test/integration/minifi/core/MinifiContainer.py
index c1b3758..8b971bf 100644
--- a/docker/test/integration/minifi/core/MinifiContainer.py
+++ b/docker/test/integration/minifi/core/MinifiContainer.py
@@ -8,8 +8,10 @@ class MinifiContainer(FlowContainer):
     MINIFI_VERSION = os.environ['MINIFI_VERSION']
     MINIFI_ROOT = '/opt/minifi/nifi-minifi-cpp-' + MINIFI_VERSION
 
-    def __init__(self, config_dir, name, vols, network, image_store):
-        super().__init__(config_dir, name, 'minifi-cpp', vols, network, image_store)
+    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+        if not command:
+            command = ["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " + MinifiContainer.MINIFI_ROOT + "/conf && /opt/minifi/minifi-current/bin/minifi.sh run"]
+        super().__init__(config_dir, name, 'minifi-cpp', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Starting Flow Controller"
@@ -36,6 +38,6 @@ class MinifiContainer(FlowContainer):
             detach=True,
             name=self.name,
             network=self.network.name,
-            entrypoint=["/bin/sh", "-c", "cp /tmp/minifi_config/config.yml " + MinifiContainer.MINIFI_ROOT + "/conf && /opt/minifi/minifi-current/bin/minifi.sh run"],
+            entrypoint=self.command,
             volumes=self.vols)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/MqttBrokerContainer.py b/docker/test/integration/minifi/core/MqttBrokerContainer.py
index 51632b4..7900e27 100644
--- a/docker/test/integration/minifi/core/MqttBrokerContainer.py
+++ b/docker/test/integration/minifi/core/MqttBrokerContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class MqttBrokerContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'mqtt-broker', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'mqtt-broker', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "mosquitto version [0-9\\.]+ running"
@@ -19,5 +19,6 @@ class MqttBrokerContainer(Container):
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'1883/tcp': 1883})
+            ports={'1883/tcp': 1883},
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/NifiContainer.py b/docker/test/integration/minifi/core/NifiContainer.py
index 6475133..42bdf48 100644
--- a/docker/test/integration/minifi/core/NifiContainer.py
+++ b/docker/test/integration/minifi/core/NifiContainer.py
@@ -10,8 +10,12 @@ class NifiContainer(FlowContainer):
     NIFI_VERSION = '1.7.0'
     NIFI_ROOT = '/opt/nifi/nifi-' + NIFI_VERSION
 
-    def __init__(self, config_dir, name, vols, network, image_store):
-        super().__init__(config_dir, name, 'nifi', vols, network, image_store)
+    def __init__(self, config_dir, name, vols, network, image_store, command=None):
+        if not command:
+            entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && "
+                             r"cp /tmp/nifi_config/flow.xml.gz {nifi_root}/conf && /opt/nifi/scripts/start.sh").format(name=name, nifi_root=NifiContainer.NIFI_ROOT)
+            command = ["/bin/sh", "-c", entry_command]
+        super().__init__(config_dir, name, 'nifi', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Starting Flow Controller"
@@ -33,15 +37,12 @@ class NifiContainer(FlowContainer):
 
         logging.info('Creating and running nifi docker container...')
         self.__create_config()
-
-        command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && "
-                   r"cp /tmp/nifi_config/flow.xml.gz {nifi_root}/conf && /opt/nifi/scripts/start.sh").format(name=self.name, nifi_root=NifiContainer.NIFI_ROOT)
         self.client.containers.run(
             self.image_store.get_image(self.get_engine()),
             detach=True,
             name=self.name,
             hostname=self.name,
             network=self.network.name,
-            entrypoint=["/bin/sh", "-c", command],
+            entrypoint=self.command,
             volumes=self.vols)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/OPCUAServerContainer.py b/docker/test/integration/minifi/core/OPCUAServerContainer.py
new file mode 100644
index 0000000..969395c
--- /dev/null
+++ b/docker/test/integration/minifi/core/OPCUAServerContainer.py
@@ -0,0 +1,24 @@
+import logging
+from .Container import Container
+
+
+class OPCUAServerContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'opcua-server', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "TCP network layer listening on"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running OPC UA server docker container...')
+        self.client.containers.run(
+            "lordgamez/open62541",
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            ports={'4840/tcp': 4840},
+            entrypoint=self.command)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/PostgreSQLServerContainer.py b/docker/test/integration/minifi/core/PostgreSQLServerContainer.py
index 97c7e6c..90c3700 100644
--- a/docker/test/integration/minifi/core/PostgreSQLServerContainer.py
+++ b/docker/test/integration/minifi/core/PostgreSQLServerContainer.py
@@ -2,8 +2,8 @@ from .Container import Container
 
 
 class PostgreSQLServerContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'postgresql-server', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'postgresql-server', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "database system is ready to accept connections"
@@ -17,4 +17,5 @@ class PostgreSQLServerContainer(Container):
             detach=True,
             name='postgresql-server',
             network=self.network.name,
-            environment=["POSTGRES_PASSWORD=password"])
+            environment=["POSTGRES_PASSWORD=password"],
+            entrypoint=self.command)
diff --git a/docker/test/integration/minifi/core/S3ServerContainer.py b/docker/test/integration/minifi/core/S3ServerContainer.py
index 0ddd9a9..565be4d 100644
--- a/docker/test/integration/minifi/core/S3ServerContainer.py
+++ b/docker/test/integration/minifi/core/S3ServerContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class S3ServerContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 's3-server', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command):
+        super().__init__(name, 's3-server', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "Started S3MockApplication"
@@ -20,5 +20,6 @@ class S3ServerContainer(Container):
             name=self.name,
             network=self.network.name,
             ports={'9090/tcp': 9090, '9191/tcp': 9191},
-            environment=["initialBuckets=test_bucket"])
+            environment=["initialBuckets=test_bucket"],
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 631c7b3..59e6ba4 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -12,6 +12,7 @@ from .AzureStorageServerContainer import AzureStorageServerContainer
 from .HttpProxyContainer import HttpProxyContainer
 from .PostgreSQLServerContainer import PostgreSQLServerContainer
 from .MqttBrokerContainer import MqttBrokerContainer
+from .OPCUAServerContainer import OPCUAServerContainer
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -54,7 +55,7 @@ class SingleNodeDockerCluster(Cluster):
         logging.debug('Creating network: %s', net_name)
         return docker.from_env().networks.create(net_name)
 
-    def acquire_container(self, name, engine='minifi-cpp'):
+    def acquire_container(self, name, engine='minifi-cpp', command=None):
         if name is not None and name in self.containers:
             return self.containers[name]
 
@@ -63,23 +64,25 @@ class SingleNodeDockerCluster(Cluster):
             logging.info('Container name was not provided; using generated name \'%s\'', name)
 
         if engine == 'nifi':
-            return self.containers.setdefault(name, NifiContainer(self.data_directories["nifi_config_dir"], name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, NifiContainer(self.data_directories["nifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'minifi-cpp':
-            return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, MinifiContainer(self.data_directories["minifi_config_dir"], name, self.vols, self.network, self.image_store, command))
         elif engine == 'kafka-broker':
             if 'zookeeper' not in self.containers:
-                self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store))
-            return self.containers.setdefault(name, KafkaBrokerContainer(name, self.vols, self.network, self.image_store))
+                self.containers.setdefault('zookeeper', ZookeeperContainer('zookeeper', self.vols, self.network, self.image_store, command))
+            return self.containers.setdefault(name, KafkaBrokerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'http-proxy':
-            return self.containers.setdefault(name, HttpProxyContainer(name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, HttpProxyContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 's3-server':
-            return self.containers.setdefault(name, S3ServerContainer(name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, S3ServerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'azure-storage-server':
-            return self.containers.setdefault(name, AzureStorageServerContainer(name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, AzureStorageServerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'postgresql-server':
-            return self.containers.setdefault(name, PostgreSQLServerContainer(name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, PostgreSQLServerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'mqtt-broker':
-            return self.containers.setdefault(name, MqttBrokerContainer(name, self.vols, self.network, self.image_store))
+            return self.containers.setdefault(name, MqttBrokerContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == 'opcua-server':
+            return self.containers.setdefault(name, OPCUAServerContainer(name, self.vols, self.network, self.image_store, command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/core/ZookeeperContainer.py b/docker/test/integration/minifi/core/ZookeeperContainer.py
index 09ec31b..107339e 100644
--- a/docker/test/integration/minifi/core/ZookeeperContainer.py
+++ b/docker/test/integration/minifi/core/ZookeeperContainer.py
@@ -3,8 +3,8 @@ from .Container import Container
 
 
 class ZookeeperContainer(Container):
-    def __init__(self, name, vols, network, image_store):
-        super().__init__(name, 'zookeeper', vols, network, image_store)
+    def __init__(self, name, vols, network, image_store, command):
+        super().__init__(name, 'zookeeper', vols, network, image_store, command)
 
     def get_startup_finished_log_entry(self):
         return "binding to port"
@@ -19,5 +19,6 @@ class ZookeeperContainer(Container):
             detach=True,
             name='zookeeper',
             network=self.network.name,
-            ports={'2181/tcp': 2181})
+            ports={'2181/tcp': 2181},
+            entrypoint=self.command)
         logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/processors/FetchOPCProcessor.py b/docker/test/integration/minifi/processors/FetchOPCProcessor.py
new file mode 100644
index 0000000..cfa9b76
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchOPCProcessor.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+
+class FetchOPCProcessor(Processor):
+    def __init__(self):
+        super(FetchOPCProcessor, self).__init__(
+            'FetchOPCProcessor',
+            auto_terminate=['success', 'failure'])
diff --git a/docker/test/integration/minifi/processors/PutOPCProcessor.py b/docker/test/integration/minifi/processors/PutOPCProcessor.py
new file mode 100644
index 0000000..47950e7
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutOPCProcessor.py
@@ -0,0 +1,9 @@
+from ..core.Processor import Processor
+
+
+class PutOPCProcessor(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PutOPCProcessor, self).__init__(
+            'PutOPCProcessor',
+            auto_terminate=['success', 'failure'],
+            schedule=schedule)
diff --git a/docker/test/integration/resources/opcua/opcua_client_cert.der b/docker/test/integration/resources/opcua/opcua_client_cert.der
new file mode 100644
index 0000000..44be253
Binary files /dev/null and b/docker/test/integration/resources/opcua/opcua_client_cert.der differ
diff --git a/docker/test/integration/resources/opcua/opcua_client_key.der b/docker/test/integration/resources/opcua/opcua_client_key.der
new file mode 100644
index 0000000..91b30bf
Binary files /dev/null and b/docker/test/integration/resources/opcua/opcua_client_key.der differ
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index c2fef16..f14dfe0 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -63,19 +63,31 @@ def step_impl(context, processor_type, property, property_value, processor_name)
                           format(processor_type=processor_type, property=property, property_value=property_value, minifi_container_name="minifi-cpp-flow", processor_name=processor_name))
 
 
-@given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor in a \"{minifi_container_name}\" flow")
-@given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow")
-def step_impl(context, processor_type, minifi_container_name):
+@given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, processor_name, minifi_container_name):
     container = context.test.acquire_container(minifi_container_name)
     processor = locate("minifi.processors." + processor_type + "." + processor_type)()
-    processor.set_name(processor_type)
+    processor.set_name(processor_name)
     context.test.add_node(processor)
     # Assume that the first node declared is primary unless specified otherwise
     if not container.get_start_nodes():
         container.add_start_node(processor)
 
 
+@given("a {processor_type} processor with the name \"{processor_name}\"")
+def step_impl(context, processor_type, processor_name):
+    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
+                          format(processor_type=processor_type, processor_name=processor_name, minifi_container_name="minifi-cpp-flow"))
+
+
+@given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor in a \"{minifi_container_name}\" flow")
+@given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow")
+def step_impl(context, processor_type, minifi_container_name):
+    context.execute_steps("given a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow".
+                          format(processor_type=processor_type, processor_name=processor_type, minifi_container_name=minifi_container_name))
+
+
 @given("a {processor_type} processor")
 @given("a {processor_type} processor set up to communicate with an s3 server")
 @given("a {processor_type} processor set up to communicate with the same s3 server")
@@ -344,6 +356,17 @@ def step_impl(context):
     context.test.acquire_container("postgresql-server", "postgresql-server")
 
 
+# OPC UA
+@given("an OPC UA server is set up")
+def step_impl(context):
+    context.test.acquire_container("opcua-server", "opcua-server")
+
+
+@given("an OPC UA server is set up with access control")
+def step_impl(context):
+    context.test.acquire_container("opcua-server", "opcua-server", ["/opt/open62541/examples/access_control_server"])
+
+
 @when("the MiNiFi instance starts up")
 @when("both instances start up")
 @when("all instances start up")
@@ -561,6 +584,11 @@ def step_impl(context, regex, duration):
     context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
 
 
+@then("the OPC UA server logs contain the following message: \"{log_message}\" in less than {duration}")
+def step_impl(context, log_message, duration):
+    context.test.check_container_log_contents("opcua-server", log_message, timeparse(duration))
+
+
 # MQTT
 @then("the MQTT broker has a log line matching \"{log_pattern}\"")
 def step_impl(context, log_pattern):
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index 5652dd2..64bc443 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -91,6 +91,10 @@ class FetchOPCProcessor : public BaseOPCProcessor {
   bool lazy_mode_;
 
  private:
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
   std::vector<UA_NodeId> translatedNodeIDs_;  // Only used when user provides path, path->nodeid translation is only done once
   std::unordered_map<std::string, std::string> node_timestamp_;  // Key = Full path, Value = Timestamp
 };
diff --git a/extensions/opc/include/opc.h b/extensions/opc/include/opc.h
index 9646f3d..6472846 100644
--- a/extensions/opc/include/opc.h
+++ b/extensions/opc/include/opc.h
@@ -66,7 +66,7 @@ class Client {
   UA_StatusCode update_node(const UA_NodeId nodeId, T value);
 
   template<typename T>
-  UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+  UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, T value, UA_NodeId *receivedNodeId);
 
   static std::unique_ptr<Client> createClient(std::shared_ptr<core::logging::Logger> logger, const std::string& applicationURI,
                                               const std::vector<char>& certBuffer, const std::vector<char>& keyBuffer,
@@ -125,8 +125,6 @@ static std::map<std::string, OPCNodeDataType>  StringToOPCDataTypeMap = {{"Int64
                                                                          {"UInt32", OPCNodeDataType::UInt32}, {"Boolean", OPCNodeDataType::Boolean}, {"Float", OPCNodeDataType::Float},
                                                                          {"Double", OPCNodeDataType::Double}, {"String", OPCNodeDataType::String}};
 
-int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt);
-
 std::string nodeValue2String(const NodeData& nd);
 
 std::string OPCDateTime2String(UA_DateTime raw_date);
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index a18a583..4cb9707 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -80,6 +80,10 @@ class PutOPCProcessor : public BaseOPCProcessor {
     std::shared_ptr<core::logging::Logger> logger_;
   };
 
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
   std::string nodeID_;
   int32_t nameSpaceIdx_;
   opc::OPCNodeIDType idType_;
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
index a85d36b..050246f 100644
--- a/extensions/opc/src/opc.cpp
+++ b/extensions/opc/src/opc.cpp
@@ -32,7 +32,6 @@
 #include "open62541/client_highlevel.h"
 #include "open62541/client_config_default.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -177,7 +176,10 @@ Client::~Client() {
   if (client_ == nullptr) {
     return;
   }
-  if (UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED) {
+
+  UA_SecureChannelState channel_state;
+  UA_Client_getState(client_, &channel_state, nullptr, nullptr);
+  if (channel_state != UA_SECURECHANNELSTATE_CLOSED) {
     auto sc = UA_Client_disconnect(client_);
     if (sc != UA_STATUSCODE_GOOD) {
       logger_->log_warn("Failed to disconnect OPC client: %s", UA_StatusCode_name(sc));
@@ -190,14 +192,17 @@ bool Client::isConnected() {
   if (!client_) {
     return false;
   }
-  return UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED;
+
+  UA_SessionState session_state;
+  UA_Client_getState(client_, nullptr, &session_state, nullptr);
+  return session_state == UA_SESSIONSTATE_ACTIVATED;
 }
 
 UA_StatusCode Client::connect(const std::string& url, const std::string& username, const std::string& password) {
   if (username.empty()) {
     return UA_Client_connect(client_, url.c_str());
   } else {
-    return UA_Client_connect_username(client_, url.c_str(), username.c_str(), password.c_str());
+    return UA_Client_connectUsername(client_, url.c_str(), username.c_str(), password.c_str());
   }
 }
 
@@ -241,7 +246,7 @@ NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const std::stri
       auto server_timestamp = OPCDateTime2String(dv->serverTimestamp);
       auto source_timestamp = OPCDateTime2String(dv->sourceTimestamp);
       nodedata.attributes["Sourcetimestamp"] = source_timestamp;
-      UA_ReadResponse_deleteMembers(&response);
+      UA_ReadResponse_clear(&response);
 
       nodedata.dataTypeID = var->type->typeIndex;
       nodedata.addVariant(var);
@@ -303,10 +308,10 @@ void Client::traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc> cb,
   UA_BrowseResponse bResp = UA_Client_Service_browse(client_, bReq);
 
   const auto guard = gsl::finally([&bResp]() {
-    UA_BrowseResponse_deleteMembers(&bResp);
+    UA_BrowseResponse_clear(&bResp);
   });
 
-  UA_BrowseRequest_deleteMembers(&bReq);
+  UA_BrowseRequest_clear(&bReq);
 
   for (size_t i = 0; i < bResp.resultsSize; ++i) {
     for (size_t j = 0; j < bResp.results[i].referencesSize; ++j) {
@@ -364,7 +369,7 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa
   UA_TranslateBrowsePathsToNodeIdsResponse response = UA_Client_Service_translateBrowsePathsToNodeIds(client_, request);
 
   const auto guard = gsl::finally([&browsePath]() {
-    UA_BrowsePath_deleteMembers(&browsePath);
+    UA_BrowsePath_clear(&browsePath);
   });
 
   if (response.resultsSize < 1) {
@@ -385,7 +390,7 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa
     }
   }
 
-  UA_TranslateBrowsePathsToNodeIdsResponse_deleteMembers(&response);
+  UA_TranslateBrowsePathsToNodeIdsResponse_clear(&response);
 
   if (foundData) {
     logger->log_debug("Found %lu nodes for path %s", foundNodeIDs.size(), path.c_str());
@@ -397,7 +402,7 @@ UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& pa
 }
 
 template<typename T>
-UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId *receivedNodeId) {
+UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, T value, UA_NodeId *receivedNodeId) {
   UA_VariableAttributes attr = UA_VariableAttributes_default;
   add_value_to_variant(&attr.value, value);
   char local[6] = "en-US";
@@ -405,7 +410,7 @@ UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId tar
   UA_StatusCode sc = UA_Client_addVariableNode(client_,
                                                targetNodeId,
                                                parentNodeId,
-                                               UA_NODEID_NUMERIC(0, OPCNodeDataTypeToTypeID(dt)),
+                                               UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
                                                UA_QUALIFIEDNAME(1, const_cast<char*>(browseName.c_str())),
                                                UA_NODEID_NULL,
                                                attr, receivedNodeId);
@@ -443,40 +448,17 @@ template UA_StatusCode Client::update_node<bool>(const UA_NodeId nodeId, bool va
 template UA_StatusCode Client::update_node<const char *>(const UA_NodeId nodeId, const char * value);
 template UA_StatusCode Client::update_node<std::string>(const UA_NodeId nodeId, std::string value);
 
-template UA_StatusCode Client::add_node<int64_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, int64_t value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<uint64_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint64_t value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<int32_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, int32_t value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<uint32_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint32_t value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<float>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, float value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<double>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, double value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-template UA_StatusCode Client::add_node<bool>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, bool value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<int64_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, int64_t value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint64_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint64_t value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<int32_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, int32_t value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint32_t>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint32_t value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<float>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, float value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<double>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, double value, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<bool>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName, bool value, UA_NodeId *receivedNodeId);
 template UA_StatusCode Client::add_node<const char *>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName,
-    const char * value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+    const char * value, UA_NodeId *receivedNodeId);
 template UA_StatusCode Client::add_node<std::string>(const UA_NodeId parentNodeId, const UA_NodeId targetNodeId, std::string browseName,
-    std::string value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
-
-int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt) {
-  switch (dt) {
-    case OPCNodeDataType::Boolean:
-      return UA_NS0ID_BOOLEAN;
-    case OPCNodeDataType::Int32:
-      return UA_NS0ID_INT32;
-    case OPCNodeDataType::UInt32:
-      return UA_NS0ID_UINT32;
-    case OPCNodeDataType::Int64:
-      return UA_NS0ID_INT64;
-    case OPCNodeDataType::UInt64:
-      return UA_NS0ID_UINT64;
-    case OPCNodeDataType::Float:
-      return UA_NS0ID_FLOAT;
-    case OPCNodeDataType::Double:
-      return UA_NS0ID_DOUBLE;
-    case OPCNodeDataType::String:
-      return UA_NS0ID_STRING;
-    default:
-      throw OPCException(GENERAL_EXCEPTION, "Data type is not supported");
-  }
-}
+    std::string value, UA_NodeId *receivedNodeId);
 
 std::string nodeValue2String(const NodeData& nd) {
   std::string ret_val;
diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp
index e675fea..03b18d7 100644
--- a/extensions/opc/src/opcbase.cpp
+++ b/extensions/opc/src/opcbase.cpp
@@ -44,14 +44,13 @@ namespace processors {
           ->withDescription("Application URI of the client in the format 'urn:unconfigured:application'. "
                             "Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names.")->build());
 
-
   core::Property BaseOPCProcessor::Username(
       core::PropertyBuilder::createProperty("Username")
           ->withDescription("Username to log in with.")->build());
 
   core::Property BaseOPCProcessor::Password(
       core::PropertyBuilder::createProperty("Password")
-          ->withDescription("Password to log in with. Providing this requires cert and key to be provided as well, credentials are always sent encrypted.")->build());
+          ->withDescription("Password to log in with.")->build());
 
   core::Property BaseOPCProcessor::CertificatePath(
       core::PropertyBuilder::createProperty("Certificate path")
@@ -84,45 +83,41 @@ namespace processors {
 
     auto certificatePathRes = context->getProperty(CertificatePath.getName(), certpath_);
     auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
-    auto trustedPathRes = context->getProperty(TrustedPath.getName(), trustpath_);
-    if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
+    context->getProperty(TrustedPath.getName(), trustpath_);
+    if (certificatePathRes != keyPathRes) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path and Key path should be provided!");
     }
 
-    if (!password_.empty() && (certpath_.empty() || keypath_.empty() || trustpath_.empty() || applicationURI_.empty())) {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
+    if (certpath_.empty()) {
+      return;
+    }
+    if (applicationURI_.empty()) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!");
     }
 
-    if (!certpath_.empty()) {
-      if (applicationURI_.empty()) {
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!");
-      }
+    std::ifstream input_cert(certpath_, std::ios::binary);
+    if (input_cert.good()) {
+      certBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
+    }
+    std::ifstream input_key(keypath_, std::ios::binary);
+    if (input_key.good()) {
+      keyBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
+    }
 
-      std::ifstream input_cert(certpath_, std::ios::binary);
-      if (input_cert.good()) {
-        certBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
-      }
-      std::ifstream input_key(keypath_, std::ios::binary);
-      if (input_key.good()) {
-        keyBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
-      }
+    if (certBuffer_.empty()) {
+      auto error_msg = utils::StringUtils::join_pack("Failed to load cert from path: ", certpath_);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
+    }
+    if (keyBuffer_.empty()) {
+      auto error_msg = utils::StringUtils::join_pack("Failed to load key from path: ", keypath_);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
+    }
 
-      trustBuffers_.emplace_back();
+    if (!trustpath_.empty()) {
       std::ifstream input_trust(trustpath_, std::ios::binary);
       if (input_trust.good()) {
-        trustBuffers_[0] = std::vector<char>(std::istreambuf_iterator<char>(input_trust), {});
-      }
-
-      if (certBuffer_.empty()) {
-        auto error_msg = utils::StringUtils::join_pack("Failed to load cert from path: ", certpath_);
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-      if (keyBuffer_.empty()) {
-        auto error_msg = utils::StringUtils::join_pack("Failed to load key from path: ", keypath_);
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-
-      if (trustBuffers_[0].empty()) {
+        trustBuffers_.push_back(std::vector<char>(std::istreambuf_iterator<char>(input_trust), {}));
+      } else {
         auto error_msg = utils::StringUtils::join_pack("Failed to load trusted server certs from path: ", trustpath_);
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 8e4368a..02e2ddd 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -353,28 +353,28 @@ namespace processors {
         switch (nodeDataType_) {
           case opc::OPCNodeDataType::Int64: {
             int64_t value = std::stoll(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::UInt64: {
             uint64_t value = std::stoull(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::Int32: {
             int32_t value = std::stoi(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::UInt32: {
             uint32_t value = std::stoul(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::Boolean: {
             const auto contentstr_parsed = utils::StringUtils::toBool(contentstr);
             if (contentstr_parsed) {
-              sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr_parsed.value(), nodeDataType_, &resultnode);
+              sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr_parsed.value(), &resultnode);
             } else {
               throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
             }
@@ -382,16 +382,16 @@ namespace processors {
           }
           case opc::OPCNodeDataType::Float: {
             float value = std::stof(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::Double: {
             double value = std::stod(contentstr);
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, &resultnode);
             break;
           }
           case opc::OPCNodeDataType::String: {
-            sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, nodeDataType_, &resultnode);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, &resultnode);
             break;
           }
           default:
diff --git a/thirdparty/open62541/open62541.patch b/thirdparty/open62541/open62541.patch
index 2d36e2a..17a5ab1 100644
--- a/thirdparty/open62541/open62541.patch
+++ b/thirdparty/open62541/open62541.patch
@@ -1,5 +1,5 @@
 diff --git a/CMakeLists.txt b/CMakeLists.txt
-index d426e1da..5f1a4044 100644
+index 9184b943..44ae77cf 100644
 --- a/CMakeLists.txt
 +++ b/CMakeLists.txt
 @@ -7,7 +7,7 @@ endif()
@@ -11,11 +11,20 @@ index d426e1da..5f1a4044 100644
  find_package(PythonInterp REQUIRED)
  find_package(Git)
  include(AssignSourceGroup)
-@@ -416,17 +416,17 @@ if(NOT UA_COMPILE_AS_CXX AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID
+@@ -526,7 +526,7 @@ if(NOT UA_FORCE_CPP AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" ST
+     check_add_cc_flag("-Wall")      # Warnings
+     check_add_cc_flag("-Wextra")    # More warnings
+     check_add_cc_flag("-Wpedantic") # Standard compliance
+-    check_add_cc_flag("-Werror")    # All warnings are errors
++    # check_add_cc_flag("-Werror")    # All warnings are errors
+ 
+     check_add_cc_flag("-Wno-static-in-inline") # Clang doesn't like the use of static inline methods inside static inline methods
+     check_add_cc_flag("-Wno-overlength-strings") # May happen in the nodeset compiler when complex values are directly encoded
+@@ -576,17 +576,17 @@ if(NOT UA_FORCE_CPP AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" ST
  
          # IPO requires too much memory for unit tests
          # GCC docu recommends to compile all files with the same options, therefore ignore it completely
--        if(NOT UA_BUILD_UNIT_TESTS)
+-        if(NOT UA_BUILD_UNIT_TESTS AND NOT DEFINED CMAKE_INTERPROCEDURAL_OPTIMIZATION)
 -            # needed to check if IPO is supported (check needs cmake > 3.9)
 -            if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
 -                cmake_policy(SET CMP0069 NEW) # needed as long as required cmake < 3.9
@@ -26,30 +35,17 @@ index d426e1da..5f1a4044 100644
 -                endif()
 -            endif()
 -        endif()
-+        #if(NOT UA_BUILD_UNIT_TESTS)
-+        #    # needed to check if IPO is supported (check needs cmake > 3.9)
-+        #    if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
-+        #        cmake_policy(SET CMP0069 NEW) # needed as long as required cmake < 3.9
-+        #        include(CheckIPOSupported)
-+        #        check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural Optimization / Link Time Optimization (should be same as -flto)
-+        #        if(CC_HAS_IPO)
-+        #            set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
-+        #        endif()
-+        #    endif()
-+        #endif()
++       # if(NOT UA_BUILD_UNIT_TESTS AND NOT DEFINED CMAKE_INTERPROCEDURAL_OPTIMIZATION)
++       #     # needed to check if IPO is supported (check needs cmake > 3.9)
++       #     if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
++       #         cmake_policy(SET CMP0069 NEW) # needed as long as required cmake < 3.9
++       #         include(CheckIPOSupported)
++       #         check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural Optimization / Link Time Optimization (should be same as -flto)
++       #         if(CC_HAS_IPO)
++       #            set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
++       #         endif()
++       #     endif()
++       # endif()
      endif()
  
      if(UA_ENABLE_AMALGAMATION)
-diff --git a/CMakeLists.txt b/CMakeLists.txt
-index 94ceb127..5d0ea914 100644
---- a/CMakeLists.txt
-+++ b/CMakeLists.txt
-@@ -405,7 +405,7 @@ if(NOT UA_COMPILE_AS_CXX AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID
-                     -fno-strict-aliasing # fewer compiler assumptions about pointer types
-                     -fexceptions # recommended for multi-threaded C code, also in combination with C++ code
-                     )
--    set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Werror")
-+    # set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Werror")
- 
-     if (NOT MINGW)
-         if(UA_ENABLE_HARDENING)