You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/17 00:21:47 UTC
[nifi-minifi-cpp] 01/04: MINIFICPP-2035 NiFi flow json format support
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ce288b920d2ac6dd1a445524b81aaf88baa501a9
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:09:38 2023 +0100
MINIFICPP-2035 NiFi flow json format support
Closes #1494
Signed-off-by: Marton Szasz <sz...@apache.org>
---
.../integration/MiNiFi_integration_test_driver.py | 3 +
docker/test/integration/cluster/ContainerStore.py | 3 +
.../test/integration/cluster/DockerTestCluster.py | 3 +
.../cluster/containers/MinifiContainer.py | 9 +-
docker/test/integration/features/s2s.feature | 7 +-
docker/test/integration/features/script.feature | 3 +-
docker/test/integration/features/sql.feature | 3 +-
.../Minifi_flow_json_serializer.py | 141 +++++++++++++
docker/test/integration/steps/steps.py | 5 +
extensions/http-curl/protocols/RESTSender.cpp | 1 +
.../tests/unit/AdaptiveConfigurationTests.cpp | 66 ++++++
.../tests/unit/FlowJsonTests.cpp | 163 +++++++++++++++
.../tests/unit/YamlConfigurationTests.cpp | 153 +++-----------
libminifi/CMakeLists.txt | 2 +-
libminifi/include/RemoteProcessorGroupPort.h | 4 +
libminifi/include/core/ProcessGroup.h | 2 -
libminifi/include/core/ProcessorConfig.h | 6 +-
.../AdaptiveConfiguration.h} | 31 +--
libminifi/include/core/flow/CheckRequiredField.h | 6 +-
libminifi/include/core/flow/FlowSchema.h | 88 ++++++++
libminifi/include/core/flow/Node.h | 59 +++++-
.../include/core/flow/StructuredConfiguration.h | 28 ++-
.../include/core/flow/StructuredConnectionParser.h | 10 +-
libminifi/include/core/json/JsonNode.h | 19 ++
.../include/core/state/nodes/SchedulingNodes.h | 4 +-
libminifi/include/core/yaml/YamlConfiguration.h | 9 -
libminifi/include/core/yaml/YamlNode.h | 16 +-
libminifi/src/core/ConfigurationFactory.cpp | 14 +-
libminifi/src/core/ProcessGroup.cpp | 6 +-
libminifi/src/core/flow/AdaptiveConfiguration.cpp | 67 ++++++
libminifi/src/core/flow/CheckRequiredField.cpp | 18 +-
libminifi/src/core/flow/FlowSchema.cpp | 144 +++++++++++++
libminifi/src/core/flow/Node.cpp | 10 +-
.../src/core/flow/StructuredConfiguration.cpp | 227 +++++++++++----------
.../src/core/flow/StructuredConnectionParser.cpp | 29 ++-
libminifi/src/core/json/JsonConfiguration.cpp | 89 --------
libminifi/src/core/yaml/YamlConfiguration.cpp | 24 +--
libminifi/test/ConfigurationTestController.h | 56 +++++
minifi_main/MiNiFiMain.cpp | 2 +-
39 files changed, 1076 insertions(+), 454 deletions(-)
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index e3153dffe..21dec70b4 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -315,3 +315,6 @@ class MiNiFi_integration_test:
def enable_sql_in_minifi(self):
self.cluster.enable_sql_in_minifi()
+
+ def set_yaml_in_minifi(self):
+ self.cluster.set_yaml_in_minifi()
diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py
index 867e41076..fb5f9c6b8 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -160,6 +160,9 @@ class ContainerStore:
def enable_sql_in_minifi(self):
self.minifi_options.enable_sql = True
+ def set_yaml_in_minifi(self):
+ self.minifi_options.config_format = "yaml"
+
def get_startup_finished_log_entry(self, container_name):
return self.containers[container_name].get_startup_finished_log_entry()
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
index 9688c732e..0a07119a5 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -85,6 +85,9 @@ class DockerTestCluster:
def enable_sql_in_minifi(self):
self.container_store.enable_sql_in_minifi()
+ def set_yaml_in_minifi(self):
+ self.container_store.set_yaml_in_minifi()
+
def get_app_log(self, container_name):
log_source = self.container_store.log_source(container_name)
if log_source == LogSource.FROM_DOCKER_CONTAINER:
diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py
index 93d65ddda..7f70d4fb7 100644
--- a/docker/test/integration/cluster/containers/MinifiContainer.py
+++ b/docker/test/integration/cluster/containers/MinifiContainer.py
@@ -21,6 +21,7 @@ import shutil
import copy
from .FlowContainer import FlowContainer
from minifi.flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
+from minifi.flow_serialization.Minifi_flow_json_serializer import Minifi_flow_json_serializer
class MinifiOptions:
@@ -30,6 +31,7 @@ class MinifiOptions:
self.enable_provenance = False
self.enable_prometheus = False
self.enable_sql = False
+ self.config_format = "json"
class MinifiContainer(FlowContainer):
@@ -58,7 +60,12 @@ class MinifiContainer(FlowContainer):
return "Starting Flow Controller"
def _create_config(self):
- serializer = Minifi_flow_yaml_serializer()
+ if self.options.config_format == "yaml":
+ serializer = Minifi_flow_yaml_serializer()
+ elif self.options.config_format == "json":
+ serializer = Minifi_flow_json_serializer()
+ else:
+ assert False, "Invalid flow configuration format: {}".format(self.options.config_format)
test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers)
logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file:
diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature
index a836a50c5..a51589a02 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -35,7 +35,8 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
- Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ Given a MiNiFi CPP server with yaml config
+ And a GenerateFlowFile processor with the "File Size" property set to "0B"
And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
@@ -47,7 +48,9 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds
Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true
- Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ # "drop empty" is only supported with yaml config
+ Given a MiNiFi CPP server with yaml config
+ And a GenerateFlowFile processor with the "File Size" property set to "0B"
And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
And the connection going to the RemoteProcessGroup has "drop empty" set
diff --git a/docker/test/integration/features/script.feature b/docker/test/integration/features/script.feature
index 4a31b3c42..b8ca2e5f6 100644
--- a/docker/test/integration/features/script.feature
+++ b/docker/test/integration/features/script.feature
@@ -29,7 +29,8 @@ Feature: MiNiFi can execute Lua and Python scripts
Then the Minifi logs contain the following message: "Sleeping forever" 3 times after 5 seconds
Scenario: ExecuteScript should only allow one Python script running at a time
- Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ Given a MiNiFi CPP server with yaml config
+ And a GenerateFlowFile processor with the "File Size" property set to "0B"
And the scheduling period of the GenerateFlowFile processor is set to "500 ms"
And a ExecuteScript processor with the "Script File" property set to "/tmp/resources/python/sleep_forever.py"
And the "Script Engine" property of the ExecuteScript processor is set to "python"
diff --git a/docker/test/integration/features/sql.feature b/docker/test/integration/features/sql.feature
index 768ed4d7e..bafab8e95 100644
--- a/docker/test/integration/features/sql.feature
+++ b/docker/test/integration/features/sql.feature
@@ -33,7 +33,8 @@ Feature: Executing SQL operations from MiNiFi-C++
Then the query "SELECT * FROM test_table WHERE int_col = 42" returns 1 rows in less than 120 seconds on the PostgreSQL server
Scenario: A MiNiFi instance can query to test table with ExecuteSQL processor
- Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+ Given a MiNiFi CPP server with yaml config
+ And a GenerateFlowFile processor with the "File Size" property set to "0B"
And a UpdateAttribute processor with the "sql.args.1.value" property set to "apple"
And the "sql.args.2.value" property of the UpdateAttribute processor is set to "banana"
And a ExecuteSQL processor with the "SQL select query" property set to "SELECT * FROM test_table WHERE text_col = ? OR text_col = ? ORDER BY int_col DESC"
diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
new file mode 100644
index 000000000..4468a6107
--- /dev/null
+++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import uuid
+import json
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+from ..core.Funnel import Funnel
+
+
+class Minifi_flow_json_serializer:
+ def serialize(self, start_nodes, controllers):
+ res = {
+ 'rootGroup': {
+ 'name': 'MiNiFi Flow',
+ 'processors': [],
+ 'funnels': [],
+ 'connections': [],
+ 'remoteProcessGroups': [],
+ 'controllerServices': []
+ }
+ }
+ visited = []
+
+ for node in start_nodes:
+ self.serialize_node(node, res['rootGroup'], visited)
+
+ for controller in controllers:
+ self.serialize_controller(controller, res['rootGroup'])
+
+ return json.dumps(res)
+
+ def serialize_node(self, connectable, root, visited):
+ visited.append(connectable)
+
+ if hasattr(connectable, 'name'):
+ connectable_name = connectable.name
+ else:
+ connectable_name = str(connectable.uuid)
+
+ if isinstance(connectable, InputPort):
+ group = connectable.remote_process_group
+ res_group = None
+
+ for res_group_candidate in root['remoteProcessGroups']:
+ assert isinstance(res_group_candidate, dict)
+ if res_group_candidate['identifier'] == str(group.uuid):
+ res_group = res_group_candidate
+
+ if res_group is None:
+ res_group = {
+ 'name': group.name,
+ 'identifier': str(group.uuid),
+ 'targetUri': group.url,
+ 'communicationsTimeout': '30 sec',
+ 'yieldDuration': '3 sec',
+ 'inputPorts': []
+ }
+
+ root['remoteProcessGroups'].append(res_group)
+
+ res_group['inputPorts'].append({
+ 'identifier': str(connectable.uuid),
+ 'name': connectable.name
+ })
+
+ if isinstance(connectable, Processor):
+ root['processors'].append({
+ 'name': connectable_name,
+ 'identifier': str(connectable.uuid),
+ 'type': connectable.class_prefix + connectable.clazz,
+ 'schedulingStrategy': connectable.schedule['scheduling strategy'],
+ 'schedulingPeriod': connectable.schedule['scheduling period'],
+ 'penaltyDuration': connectable.schedule['penalization period'],
+ 'yieldDuration': connectable.schedule['yield period'],
+ 'runDurationMillis': connectable.schedule['run duration nanos'],
+ 'properties': connectable.properties,
+ 'autoTerminatedRelationships': connectable.auto_terminate,
+ 'concurrentlySchedulableTaskCount': connectable.max_concurrent_tasks
+ })
+
+ for svc in connectable.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ root['controllerServices'].append({
+ 'name': svc.name,
+ 'identifier': svc.id,
+ 'type': svc.service_class,
+ 'properties': svc.properties
+ })
+
+ if isinstance(connectable, Funnel):
+ root['funnels'].append({
+ 'identifier': str(connectable.uuid)
+ })
+
+ for conn_name in connectable.connections:
+ conn_procs = connectable.connections[conn_name]
+
+ if not isinstance(conn_procs, list):
+ conn_procs = [conn_procs]
+
+ for proc in conn_procs:
+ root['connections'].append({
+ 'name': str(uuid.uuid4()),
+ 'source': {'id': str(connectable.uuid)},
+ 'destination': {'id': str(proc.uuid)}
+ })
+ if (all(str(connectable.uuid) != x['identifier'] for x in root['funnels'])):
+ root['connections'][-1]['selectedRelationships'] = [conn_name]
+ if proc not in visited:
+ self.serialize_node(proc, root, visited)
+
+ def serialize_controller(self, controller, root):
+ if hasattr(controller, 'name'):
+ connectable_name = controller.name
+ else:
+ connectable_name = str(controller.uuid)
+
+ root['controllerServices'].append({
+ 'name': connectable_name,
+ 'identifier': controller.id,
+ 'type': controller.service_class,
+ 'properties': controller.properties
+ })
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 686c33ebe..cce5eaf30 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1037,3 +1037,8 @@ def step_impl(context, size: str, duration: str) -> None:
@then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}')
def step_impl(context, peak_usage_percent: str, duration: str) -> None:
context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration))
+
+
+@given(u'a MiNiFi CPP server with yaml config')
+def step_impl(context):
+ context.test.set_yaml_in_minifi()
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index b58bf3233..ad3036cbe 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -162,6 +162,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct
if (payload.getOperation() == Operation::TRANSFER) {
auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max());
client.setReadCallback(std::move(read));
+ client.setRequestHeader("Accept", "application/vnd.minifi-c2+json;version=1, text/yml");
} else {
// Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
// TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
diff --git a/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
new file mode 100644
index 000000000..7bf8ddd27
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "ConfigurationTestController.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+TEST_CASE("Adaptive configuration can parse JSON") {
+ ConfigurationTestController controller;
+
+ const char* json_config = R"(
+ {
+ "Flow Controller": {"name": "root"},
+ "Processors": [
+ {
+ "id": "00000000-0000-0000-0000-000000000001",
+ "class": "DummyProcessor",
+ "name": "Proc1"
+ }
+ ],
+ "Connections": []
+ }
+ )";
+
+ core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+ auto root = config.getRootFromPayload(json_config);
+
+ REQUIRE(root->findProcessorByName("Proc1"));
+}
+
+TEST_CASE("Adaptive configuration can parse YAML") {
+ ConfigurationTestController controller;
+
+ const char* yaml_config = R"(
+Flow Controller:
+ name: root
+Processors:
+- id: 00000000-0000-0000-0000-000000000001
+ class: DummyProcessor
+ name: Proc1
+Connections: []
+ )";
+
+ core::flow::AdaptiveConfiguration config{controller.getContext()};
+
+ auto root = config.getRootFromPayload(yaml_config);
+
+ REQUIRE(root->findProcessorByName("Proc1"));
+}
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
new file mode 100644
index 000000000..b7d1a47f8
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <memory>
+#include <chrono>
+#include "core/repository/VolatileContentRepository.h"
+#include "core/ProcessGroup.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "utils/TestUtils.h"
+#include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
+#include "Funnel.h"
+
+using namespace std::literals::chrono_literals;
+
+TEST_CASE("NiFi flow json format is correctly parsed") {
+ ConfigurationTestController test_controller;
+
+ core::flow::AdaptiveConfiguration config(test_controller.getContext());
+
+ static const std::string CONFIG_JSON =
+ R"(
+{
+ "rootGroup": {
+ "name": "MiNiFi Flow",
+ "processors": [{
+ "identifier": "00000000-0000-0000-0000-000000000001",
+ "name": "MyGenFF",
+ "type": "org.apache.nifi.processors.standard.GenerateFlowFile",
+ "concurrentlySchedulableTaskCount": 15,
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "schedulingPeriod": "3 sec",
+ "penaltyDuration": "12 sec",
+ "yieldDuration": "4 sec",
+ "runDurationMillis": 12,
+ "autoTerminatedRelationships": ["one", "two"],
+ "properties": {
+ "File Size": "10 B",
+ "Batch Size": 12
+ }
+ }],
+ "funnels": [{
+ "identifier": "00000000-0000-0000-0000-000000000010",
+ "name": "CoolFunnel"
+ }],
+ "connections": [{
+ "identifier": "00000000-0000-0000-0000-000000000002",
+ "name": "GenToFunnel",
+ "source": {
+ "id": "00000000-0000-0000-0000-000000000001",
+ "name": "MyGenFF"
+ },
+ "destination": {
+ "id": "00000000-0000-0000-0000-000000000010",
+ "name": "CoolFunnel"
+ },
+ "selectedRelationships": ["a", "b"],
+ "backPressureObjectThreshold": 7,
+ "backPressureDataSizeThreshold": "11 KB",
+ "flowFileExpiration": "13 sec"
+ }, {
+ "identifier": "00000000-0000-0000-0000-000000000008",
+ "name": "FunnelToS2S",
+ "source": {
+ "id": "00000000-0000-0000-0000-000000000010",
+ "name": "CoolFunnel"
+ },
+ "destination": {
+ "id": "00000000-0000-0000-0000-000000000003",
+ "name": "AmazingInputPort"
+ },
+ "selectedRelationships": ["success"]
+ }],
+ "remoteProcessGroups": [{
+ "name": "NiFi Flow",
+ "targetUri": "https://localhost:8090/nifi",
+ "yieldDuration": "6 sec",
+ "communicationsTimeout": "19 sec",
+ "inputPorts": [{
+ "identifier": "00000000-0000-0000-0000-000000000003",
+ "name": "AmazingInputPort",
+ "targetId": "00000000-0000-0000-0000-000000000005",
+ "concurrentlySchedulableTaskCount": 7
+ }]
+ }]
+ }
+})";
+
+ std::unique_ptr<core::ProcessGroup> flow = config.getRootFromPayload(CONFIG_JSON);
+ REQUIRE(flow);
+
+ // verify processor
+ auto* proc = flow->findProcessorByName("MyGenFF");
+ REQUIRE(proc);
+ REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001");
+ REQUIRE(15 == proc->getMaxConcurrentTasks());
+ REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == proc->getSchedulingStrategy());
+ REQUIRE(3s == proc->getSchedulingPeriodNano());
+ REQUIRE(12s == proc->getPenalizationPeriod());
+ REQUIRE(4s == proc->getYieldPeriodMsec());
+ REQUIRE(proc->isAutoTerminated({"one", ""}));
+ REQUIRE(proc->isAutoTerminated({"two", ""}));
+ REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
+ REQUIRE(proc->getProperty("File Size") == "10 B");
+ REQUIRE(proc->getProperty("Batch Size") == "12");
+
+ // verify funnel
+ auto* funnel = dynamic_cast<minifi::Funnel*>(flow->findProcessorByName("CoolFunnel"));
+ REQUIRE(funnel);
+ REQUIRE(funnel->getUUIDStr() == "00000000-0000-0000-0000-000000000010");
+
+ // verify RPG input port
+ auto* port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(flow->findProcessorByName("AmazingInputPort"));
+ REQUIRE(port);
+ REQUIRE(port->getUUIDStr() == "00000000-0000-0000-0000-000000000003");
+ REQUIRE(port->getMaxConcurrentTasks() == 7);
+ REQUIRE(port->getInstances().size() == 1);
+ REQUIRE(port->getInstances().front().host_ == "localhost");
+ REQUIRE(port->getInstances().front().port_ == 8090);
+ REQUIRE(port->getInstances().front().protocol_ == "https://");
+ REQUIRE(port->getProperty("Port UUID") == "00000000-0000-0000-0000-000000000005");
+
+ // verify connection
+ std::map<std::string, minifi::Connection*> connection_map;
+ flow->getConnections(connection_map);
+ REQUIRE(4 == connection_map.size());
+ auto connection1 = connection_map.at("00000000-0000-0000-0000-000000000002");
+ REQUIRE(connection1);
+ REQUIRE("GenToFunnel" == connection1->getName());
+ REQUIRE(connection1->getSource() == proc);
+ REQUIRE(connection1->getDestination() == funnel);
+ REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}}));
+ REQUIRE(connection1->getMaxQueueSize() == 7);
+ REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB);
+ REQUIRE(13s == connection1->getFlowExpirationDuration());
+
+ auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008");
+ REQUIRE(connection2);
+ REQUIRE("FunnelToS2S" == connection2->getName());
+ REQUIRE(connection2->getSource() == funnel);
+ REQUIRE(connection2->getDestination() == port);
+ REQUIRE(connection2->getRelationships() == (std::set<core::Relationship>{{"success", ""}}));
+}
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index b14ee1f83..651480578 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -28,17 +28,14 @@
#include "Catch.h"
#include "utils/TestUtils.h"
#include "utils/StringUtils.h"
+#include "ConfigurationTestController.h"
using namespace std::literals::chrono_literals;
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
SECTION("loading YAML without optional component IDs works") {
static const std::string CONFIG_YAML_WITHOUT_IDS =
@@ -222,13 +219,9 @@ Provenance Reporting:
}
TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string TEST_CONFIG_YAML =
R"(
@@ -347,13 +340,9 @@ NiFi Properties Overrides: {}
}
TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string TEST_CONFIG_YAML =
R"(
@@ -496,17 +485,9 @@ NiFi Properties Overrides: {}
}
TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setTrace<core::YamlConfiguration>();
-
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -531,17 +512,9 @@ Processors:
}
TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
- TestController test_controller;
-
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -567,25 +540,16 @@ Processors:
} catch (const std::exception &e) {
caught_exception = true;
REQUIRE("Unable to parse configuration file for component named 'XYZ' because required property "
- "'Input Directory' is not set [in 'Processors' section of configuration file]" == std::string(e.what()));
+ "'Input Directory' is not set [in '/Processors/0/Properties' section of configuration file]" == std::string(e.what()));
}
REQUIRE(caught_exception);
}
TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
- logTestController.setDebug<core::Processor>();
-
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -623,17 +587,9 @@ class DummyComponent : public core::ConfigurableComponent {
};
TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
- TestController test_controller;
-
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -644,17 +600,9 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
}
TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
-
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
@@ -673,16 +621,9 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
}
TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
- TestController test_controller;
-
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ ConfigurationTestController test_controller;
+
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -693,15 +634,9 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
}
TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
- TestController test_controller;
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ ConfigurationTestController test_controller;
+
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -712,16 +647,9 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
}
TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") {
- TestController test_controller;
-
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ ConfigurationTestController test_controller;
+
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -740,15 +668,8 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]")
}
TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
- TestController test_controller;
- LogTestController &logTestController = LogTestController::getInstance();
- logTestController.setDebug<TestPlan>();
- logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ ConfigurationTestController test_controller;
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -767,13 +688,9 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
}
TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
- TestController test_controller;
+ ConfigurationTestController test_controller;
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration});
+ core::YamlConfiguration yamlConfig(test_controller.getContext());
static const std::string CONFIG_YAML_WITH_FUNNEL =
R"(
@@ -858,12 +775,8 @@ Remote Process Groups: []
}
TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
- TestController test_controller;
- std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository");
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yaml_config({test_flow_file_repo, content_repo, stream_factory, configuration});
+ ConfigurationTestController test_controller;
+ core::YamlConfiguration yaml_config(test_controller.getContext());
for (char i = '1'; i <= '8'; ++i) {
DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 354e944e2..dc51d0148 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -60,7 +60,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...]
# manually add this as it might not yet be present when this executes
list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index 6cf1778ce..3e8cee690 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -160,6 +160,10 @@ class RemoteProcessorGroupPort : public core::Processor {
}
}
+ std::vector<RPG> getInstances() const {
+ return nifi_instances_;
+ }
+
void setHTTPProxy(const utils::HTTPProxy &proxy) {
this->proxy_ = proxy;
}
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index af43f8e02..89b1aa2cd 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -60,8 +60,6 @@ enum ProcessGroupType {
REMOTE_PROCESS_GROUP,
};
-#define ONSCHEDULE_RETRY_INTERVAL 30000 // millisecs
-
class ProcessGroup : public CoreComponent {
friend struct ::ProcessGroupTestAccessor;
public:
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index fee9bd7e9..d454a3f69 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -30,11 +30,11 @@ namespace minifi {
namespace core {
-#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
-#define DEFAULT_SCHEDULING_PERIOD_STR "1 sec"
+constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
+constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000};
constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0};
-#define DEFAULT_MAX_CONCURRENT_TASKS 1
+constexpr int DEFAULT_MAX_CONCURRENT_TASKS{1};
constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1};
constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/flow/AdaptiveConfiguration.h
similarity index 55%
rename from libminifi/include/core/json/JsonConfiguration.h
rename to libminifi/include/core/flow/AdaptiveConfiguration.h
index 35623381d..67a4c5885 100644
--- a/libminifi/include/core/json/JsonConfiguration.h
+++ b/libminifi/include/core/flow/AdaptiveConfiguration.h
@@ -15,37 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
-#include <memory>
-#include <optional>
#include <string>
-#include <unordered_set>
+#include <memory>
-#include "core/FlowConfiguration.h"
-#include "core/logging/LoggerConfiguration.h"
-#include "core/ProcessorConfig.h"
-#include "Exception.h"
-#include "io/StreamFactory.h"
-#include "io/validation.h"
-#include "sitetosite/SiteToSite.h"
-#include "utils/Id.h"
-#include "utils/StringUtils.h"
-#include "utils/file/FileSystem.h"
-#include "core/flow/StructuredConfiguration.h"
+#include "StructuredConfiguration.h"
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi::core::flow {
-class JsonConfiguration : public flow::StructuredConfiguration {
+class AdaptiveConfiguration : public StructuredConfiguration {
public:
- explicit JsonConfiguration(ConfigurationContext ctx);
-
- ~JsonConfiguration() override = default;
-
- std::unique_ptr<core::ProcessGroup> getRoot() override;
+ explicit AdaptiveConfiguration(ConfigurationContext ctx);
- std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override;
+ std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &payload) override;
};
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h
index 6c35b0cbc..11dc2b1e1 100644
--- a/libminifi/include/core/flow/CheckRequiredField.h
+++ b/libminifi/include/core/flow/CheckRequiredField.h
@@ -27,7 +27,7 @@
namespace org::apache::nifi::minifi::core::flow {
bool isFieldPresent(const Node &node, std::string_view field_name);
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = "");
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names);
/**
* This is a helper function for verifying the existence of a required
@@ -47,8 +47,8 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
* @throws std::invalid_argument if the required field 'field_name' is
* not present in 'node'
*/
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = "");
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_name, std::string_view error_message = "");
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {});
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message = {});
} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h
new file mode 100644
index 000000000..0badac303
--- /dev/null
+++ b/libminifi/include/core/flow/FlowSchema.h
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+#include <string>
+
+namespace org::apache::nifi::minifi::core::flow {
+
+struct FlowSchema {
+ using Keys = std::vector<std::string>;
+
+ Keys flow_header;
+ Keys root_group;
+
+ Keys processors;
+ Keys processor_properties;
+ Keys autoterminated_rels;
+ Keys max_concurrent_tasks;
+ Keys penalization_period;
+ Keys proc_yield_period;
+ Keys runduration_nanos;
+ Keys onschedule_retry_interval;
+
+ Keys connections;
+ Keys max_queue_size;
+ Keys max_queue_data_size;
+ Keys swap_threshold;
+ Keys source_id;
+ Keys source_name;
+ Keys destination_id;
+ Keys destination_name;
+ Keys flowfile_expiration;
+ Keys drop_empty;
+ Keys source_relationship;
+ Keys source_relationship_list;
+
+ Keys process_groups;
+ Keys process_group_version;
+ Keys scheduling_strategy;
+ Keys scheduling_period;
+ Keys name;
+ Keys identifier;
+ Keys type;
+ Keys controller_services;
+ Keys controller_service_properties;
+ Keys remote_process_group;
+ Keys provenance_reporting;
+ Keys provenance_reporting_port_uuid;
+ Keys provenance_reporting_batch_size;
+ Keys funnels;
+ Keys input_ports;
+ Keys output_ports;
+ Keys rpg_url;
+ Keys rpg_yield_period;
+ Keys rpg_timeout;
+ Keys rpg_local_network_interface;
+ Keys rpg_transport_protocol;
+ Keys rpg_proxy_host;
+ Keys rpg_proxy_user;
+ Keys rpg_proxy_password;
+ Keys rpg_proxy_port;
+ Keys rpg_input_ports;
+ Keys rpg_output_ports;
+ Keys rpg_port_properties;
+ Keys rpg_port_target_id;
+
+ static FlowSchema getDefault();
+ static FlowSchema getNiFiFlowJson();
+};
+
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h
index 78735f95d..1bd3ad7e2 100644
--- a/libminifi/include/core/flow/Node.h
+++ b/libminifi/include/core/flow/Node.h
@@ -25,6 +25,8 @@
#include <memory>
#include <utility>
#include "nonstd/expected.hpp"
+#include "utils/StringUtils.h"
+#include "utils/gsl.h"
namespace org::apache::nifi::minifi::core::flow {
@@ -37,6 +39,7 @@ class Node {
};
class Iterator {
+ friend class Node;
public:
class Value;
@@ -53,6 +56,7 @@ class Node {
Iterator& operator++() {
impl_->operator++();
+ ++idx_;
return *this;
}
@@ -75,6 +79,8 @@ class Node {
private:
std::unique_ptr<IteratorImpl> impl_;
+ std::string path_;
+ int idx_{0};
};
class NodeImpl {
@@ -88,6 +94,7 @@ class Node {
virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0;
virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0;
virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0;
+ virtual nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const = 0;
virtual std::string getDebugString() const = 0;
@@ -98,6 +105,8 @@ class Node {
virtual std::optional<Cursor> getCursor() const = 0;
+ virtual Node createEmpty() const = 0;
+
virtual ~NodeImpl() = default;
};
@@ -113,6 +122,7 @@ class Node {
nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();}
nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();}
nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();}
+ nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const {return impl_->getScalarAsString();}
// return a string representation of the node (need not to be deserializable)
std::string getDebugString() const {return impl_->getDebugString();}
@@ -121,14 +131,57 @@ class Node {
size_t empty() const {
return size() == 0;
}
- Iterator begin() const {return impl_->begin();}
- Iterator end() const {return impl_->end();}
- Node operator[](std::string_view key) const {return impl_->operator[](key);}
+ Iterator begin() const {
+ Iterator it = impl_->begin();
+ it.path_ = path_;
+ return it;
+ }
+ Iterator end() const {
+ Iterator it = impl_->end();
+ it.path_ = path_;
+ return it;
+ }
+
+ // considers @key to be a member of this node as is
+ Node getMember(std::string_view key) {
+ Node result = impl_->operator[](key);
+ result.path_ = utils::StringUtils::join_pack(path_, "/", key);
+ return result;
+ }
+
+ // considers @key to be a '/'-delimited access path
+ Node operator[](std::string_view key) const {
+ Node result = *this;
+ for (auto& field : utils::StringUtils::split(std::string{key}, "/")) {
+ if (key == ".") {
+ // pass: self
+ } else {
+ result = result.getMember(field);
+ }
+ if (!result) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ // considers @keys to be a set of viable access paths, the first viable is returned
+ Node operator[](gsl::span<const std::string> keys) const {
+ for (auto& key : keys) {
+ if (Node result = (*this)[key]) {
+ return result;
+ }
+ }
+ return impl_->createEmpty();
+ }
+
+ std::string getPath() const {return path_;}
std::optional<Cursor> getCursor() const {return impl_->getCursor();}
private:
std::shared_ptr<NodeImpl> impl_;
+ std::string path_;
};
class Node::Iterator::Value : public Node, public std::pair<Node, Node> {
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h
index 1b46cda9d..762a2b352 100644
--- a/libminifi/include/core/flow/StructuredConfiguration.h
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -21,6 +21,7 @@
#include <optional>
#include <string>
#include <unordered_set>
+#include <vector>
#include "core/FlowConfiguration.h"
#include "core/logging/LoggerConfiguration.h"
@@ -33,19 +34,10 @@
#include "utils/StringUtils.h"
#include "utils/file/FileSystem.h"
#include "core/flow/Node.h"
+#include "FlowSchema.h"
namespace org::apache::nifi::minifi::core::flow {
-static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller";
-static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors";
-static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
-static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
-static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting";
-static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels";
-static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports";
-static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports";
-
class StructuredConfiguration : public FlowConfiguration {
public:
StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger);
@@ -60,6 +52,8 @@ class StructuredConfiguration : public FlowConfiguration {
*/
void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string §ion) const;
+ std::unique_ptr<core::ProcessGroup> getRoot() override;
+
protected:
/**
* Returns a shared pointer to a ProcessGroup object containing the
@@ -70,7 +64,7 @@ class StructuredConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node);
+ std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node, FlowSchema schema);
std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false);
@@ -99,7 +93,7 @@ class StructuredConfiguration : public FlowConfiguration {
* @param parent the parent ProcessGroup for the port
* @param direction the TransferDirection of the port
*/
- void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
+ void parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
/**
* Parses the root level node for the flow configuration and
@@ -155,7 +149,7 @@ class StructuredConfiguration : public FlowConfiguration {
* @param properties_node the Node containing the properties
* @param processor the Processor to which to add the resulting properties
*/
- void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section);
+ void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name);
/**
* Parses the Funnels section of a configuration.
@@ -195,9 +189,9 @@ class StructuredConfiguration : public FlowConfiguration {
* is optional and defaults to 'id'
* @return the parsed or generated UUID string
*/
- std::string getOrGenerateId(const Node& node, const std::string& id_field = "id");
+ std::string getOrGenerateId(const Node& node);
- std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = "");
+ std::string getRequiredIdField(const Node& node, std::string_view error_message = "");
/**
* This is a helper function for getting an optional value, if it exists.
@@ -213,7 +207,9 @@ class StructuredConfiguration : public FlowConfiguration {
* the optional field is missing. If not provided,
* a default info message will be generated.
*/
- std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = "");
+ std::string getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message = "");
+
+ FlowSchema schema_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
std::unordered_set<std::string> uuids_;
diff --git a/libminifi/include/core/flow/StructuredConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h
index 2b1d73743..6d7869ea6 100644
--- a/libminifi/include/core/flow/StructuredConnectionParser.h
+++ b/libminifi/include/core/flow/StructuredConnectionParser.h
@@ -26,18 +26,19 @@
#include "core/flow/Node.h"
#include "utils/gsl.h"
+#include "core/flow/FlowSchema.h"
namespace org::apache::nifi::minifi::core::flow {
class StructuredConnectionParser {
public:
- static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" };
-
- explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+ explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent,
+ const std::shared_ptr<logging::Logger>& logger, std::optional<FlowSchema> schema = std::nullopt) :
connectionNode_(connectionNode),
name_(name),
parent_(parent),
- logger_(logger) {
+ logger_(logger),
+ schema_(schema.value_or(FlowSchema::getDefault())) {
if (!connectionNode.isMap()) {
throw std::logic_error("Connection node is not a map");
}
@@ -60,6 +61,7 @@ class StructuredConnectionParser {
const std::string& name_;
gsl::not_null<core::ProcessGroup*> parent_;
const std::shared_ptr<logging::Logger> logger_;
+ const FlowSchema schema_;
};
} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h
index d19ca1582..bb11ca251 100644
--- a/libminifi/include/core/json/JsonNode.h
+++ b/libminifi/include/core/json/JsonNode.h
@@ -46,6 +46,10 @@ class JsonNode : public flow::Node::NodeImpl {
return node_ ? node_->IsNull() : false;
}
+ flow::Node createEmpty() const override {
+ return flow::Node{std::make_shared<JsonNode>(nullptr)};
+ }
+
nonstd::expected<std::string, std::exception_ptr> getString() const override {
try {
if (!node_) {
@@ -93,12 +97,27 @@ class JsonNode : public flow::Node::NodeImpl {
if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
if (node_->IsInt64()) return std::to_string(node_->GetInt64());
if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+ if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
throw std::runtime_error("Cannot get string from non-integer json value");
} catch (...) {
return nonstd::make_unexpected(std::current_exception());
}
}
+ nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+ try {
+ if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
+ if (node_->IsBool()) return node_->GetBool() ? "true" : "false";
+ if (node_->IsInt64()) return std::to_string(node_->GetInt64());
+ if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+ if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength());
+ if (node_->IsDouble()) return std::to_string(node_->GetDouble());
+ throw std::runtime_error("Cannot convert non-scalar json value to string");
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
std::string getDebugString() const override {
if (!node_) return "<invalid>";
if (node_->IsObject()) return "<Map>";
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index 2262cd6e7..a98bb3bd6 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -49,7 +49,7 @@ class SchedulingDefaults : public DeviceInformation {
SerializedResponseNode defaultSchedulingStrategy;
defaultSchedulingStrategy.name = "defaultSchedulingStrategy";
- defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY;
+ defaultSchedulingStrategy.value = core::DEFAULT_SCHEDULING_STRATEGY;
schedulingDefaults.children.push_back(defaultSchedulingStrategy);
@@ -67,7 +67,7 @@ class SchedulingDefaults : public DeviceInformation {
SerializedResponseNode defaultMaxConcurrentTasks;
defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks";
- defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS;
+ defaultMaxConcurrentTasks.value = core::DEFAULT_MAX_CONCURRENT_TASKS;
schedulingDefaults.children.push_back(defaultMaxConcurrentTasks);
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 2b4a9d7d4..4d82ea394 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -44,15 +44,6 @@ class YamlConfiguration : public flow::StructuredConfiguration {
~YamlConfiguration() override = default;
- /**
- * Returns a shared pointer to a ProcessGroup object containing the
- * flow configuration.
- *
- * @return the root ProcessGroup node of the flow
- * configuration tree
- */
- std::unique_ptr<core::ProcessGroup> getRoot() override;
-
/**
* Returns a shared pointer to a ProcessGroup object containing the
* flow configuration. The yamlConfigStream argument must point to
diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h
index ad8422f6e..193c61d55 100644
--- a/libminifi/include/core/yaml/YamlNode.h
+++ b/libminifi/include/core/yaml/YamlNode.h
@@ -49,6 +49,10 @@ class YamlNode : public flow::Node::NodeImpl {
return node_.IsNull();
}
+ flow::Node createEmpty() const override {
+ return flow::Node{std::make_shared<YamlNode>(YAML::Node{YAML::NodeType::Undefined})};
+ }
+
nonstd::expected<std::string, std::exception_ptr> getString() const override {
try {
return node_.as<std::string>();
@@ -81,6 +85,14 @@ class YamlNode : public flow::Node::NodeImpl {
}
}
+ nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override {
+ try {
+ return node_.as<std::string>();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
std::string getDebugString() const override {
if (!node_) return "<invalid>";
if (node_.IsNull()) return "null";
@@ -149,11 +161,11 @@ class YamlIterator : public flow::Node::Iterator::IteratorImpl {
YAML::const_iterator it_;
};
-flow::Node::Iterator YamlNode::begin() const {
+inline flow::Node::Iterator YamlNode::begin() const {
return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())};
}
-flow::Node::Iterator YamlNode::end() const {
+inline flow::Node::Iterator YamlNode::end() const {
return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())};
}
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index 81e2bb1e0..277184996 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -29,7 +29,7 @@
#include "io/StreamFactory.h"
#include "core/yaml/YamlConfiguration.h"
-#include "core/json/JsonConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
namespace org::apache::nifi::minifi::core {
@@ -38,13 +38,7 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
if (configuration_class_name) {
class_name_lc = configuration_class_name.value();
} else if (ctx.path) {
- if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) {
- class_name_lc = "yamlconfiguration";
- } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) {
- class_name_lc = "jsonconfiguration";
- } else {
- throw std::runtime_error("Could not infer config type from file path");
- }
+ class_name_lc = "adaptiveconfiguration";
} else {
throw std::runtime_error("Neither configuration class nor config file path has been specified");
}
@@ -57,8 +51,8 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura
} else if (class_name_lc == "yamlconfiguration") {
// only load if the class is defined.
return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx));
- } else if (class_name_lc == "jsonconfiguration") {
- return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx));
+ } else if (class_name_lc == "adaptiveconfiguration") {
+ return std::unique_ptr<core::flow::AdaptiveConfiguration>(instantiate<core::flow::AdaptiveConfiguration>(ctx));
} else {
if (fail_safe) {
return std::make_unique<core::FlowConfiguration>(ctx);
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 7300d2129..509c0542d 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -33,6 +33,8 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::core {
+constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000;
+
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid)
@@ -54,7 +56,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils:
if (parent_process_group_ != nullptr) {
onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
} else {
- onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+ onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
}
transmitting_ = false;
transport_protocol_ = "RAW";
@@ -69,7 +71,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
parent_process_group_(nullptr),
logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
yield_period_msec_ = 0ms;
- onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
+ onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
transmitting_ = false;
transport_protocol_ = "RAW";
diff --git a/libminifi/src/core/flow/AdaptiveConfiguration.cpp b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
new file mode 100644
index 000000000..cef220bb4
--- /dev/null
+++ b/libminifi/src/core/flow/AdaptiveConfiguration.cpp
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/AdaptiveConfiguration.h"
+
+#include "rapidjson/document.h"
+#include "core/json/JsonNode.h"
+#include "core/yaml/YamlNode.h"
+#include "yaml-cpp/yaml.h"
+#include "utils/file/FileUtils.h"
+#include "Defaults.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+AdaptiveConfiguration::AdaptiveConfiguration(ConfigurationContext ctx)
+ : StructuredConfiguration(([&] {
+ if (!ctx.path) {
+ if (utils::file::exists(DEFAULT_NIFI_CONFIG_JSON)) {
+ ctx.path = DEFAULT_NIFI_CONFIG_JSON;
+ } else {
+ ctx.path = DEFAULT_NIFI_CONFIG_YML;
+ }
+ }
+ return std::move(ctx);
+ })(),
+ logging::LoggerFactory<AdaptiveConfiguration>::getLogger()) {}
+
+std::unique_ptr<core::ProcessGroup> AdaptiveConfiguration::getRootFromPayload(const std::string &payload) {
+ try {
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.Parse(payload.c_str(), payload.length());
+ if (res) {
+ flow::Node root{std::make_shared<JsonNode>(&doc)};
+ if (root[FlowSchema::getDefault().flow_header]) {
+ logger_->log_debug("Processing configuration as default json");
+ return getRootFrom(root, FlowSchema::getDefault());
+ } else {
+ logger_->log_debug("Processing configuration as nifi flow json");
+ return getRootFrom(root, FlowSchema::getNiFiFlowJson());
+ }
+ }
+ logger_->log_debug("Could not parse configuration as json, trying yaml");
+ YAML::Node rootYamlNode = YAML::Load(payload);
+ flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+ return getRootFrom(root, FlowSchema::getDefault());
+ } catch(...) {
+ logger_->log_error("Invalid configuration file");
+ throw;
+ }
+}
+
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp
index 6942a5844..4539d9d78 100644
--- a/libminifi/src/core/flow/CheckRequiredField.cpp
+++ b/libminifi/src/core/flow/CheckRequiredField.cpp
@@ -27,7 +27,7 @@ bool isFieldPresent(const Node &node, std::string_view field_name) {
return bool{node[field_name]};
}
-std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) {
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names) {
const Node name_node = node["name"];
// Build a helpful error message for the user so they can fix the
// invalid config file, using the component name if present
@@ -36,32 +36,32 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> &
name_node ?
"Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" :
"Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
- if (!section.empty()) {
- err_msg += " [in '" + std::string(section) + "' section of configuration file]";
- }
+
+ err_msg += " [in '" + node.getPath() + "' section of configuration file]";
+
if (auto cursor = node.getCursor()) {
err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]";
}
return err_msg;
}
-void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) {
- if (!isFieldPresent(node, field_name)) {
+void checkRequiredField(const Node &node, const std::vector<std::string>& field_names, std::string_view error_message) {
+ if (std::none_of(field_names.begin(), field_names.end(), [&] (auto& field) {return isFieldPresent(node, field);})) {
if (error_message.empty()) {
- throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section));
+ throw std::invalid_argument(buildErrorMessage(node, field_names));
}
throw std::invalid_argument(error_message.data());
}
}
-std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) {
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message) {
for (const auto& name : alternate_names) {
if (isFieldPresent(node, name)) {
return node[name].getString().value();
}
}
if (error_message.empty()) {
- throw std::invalid_argument(buildErrorMessage(node, alternate_names, section));
+ throw std::invalid_argument(buildErrorMessage(node, alternate_names));
}
throw std::invalid_argument(error_message.data());
}
diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp
new file mode 100644
index 000000000..a5df0dc64
--- /dev/null
+++ b/libminifi/src/core/flow/FlowSchema.cpp
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/flow/FlowSchema.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+FlowSchema FlowSchema::getDefault() {
+ return FlowSchema{
+ .flow_header = {"Flow Controller"},
+ .root_group = {"."},
+
+ .processors = {"Processors"},
+ .processor_properties = {"Properties"},
+ .autoterminated_rels = {"auto-terminated relationships list"},
+ .max_concurrent_tasks = {"max concurrent tasks"},
+ .penalization_period = {"penalization period"},
+ .proc_yield_period = {"yield period"},
+ .runduration_nanos = {"run duration nanos"},
+ .onschedule_retry_interval = {"onschedule retry interval"},
+
+ .connections = {"Connections"},
+ .max_queue_size = {"max work queue size"},
+ .max_queue_data_size = {"max work queue data size"},
+ .swap_threshold = {"swap threshold"},
+ .source_id = {"source id"},
+ .source_name = {"source name"},
+ .destination_id = {"destination id"},
+ .destination_name = {"destination name"},
+ .flowfile_expiration = {"flowfile expiration"},
+ .drop_empty = {"drop empty"},
+ .source_relationship = {"source relationship name"},
+ .source_relationship_list = {"source relationship names"},
+
+ .process_groups = {"Process Groups"},
+ .process_group_version = {"version"},
+ .scheduling_strategy = {"scheduling strategy"},
+ .scheduling_period = {"scheduling period"},
+ .name = {"name"},
+ .identifier = {"id"},
+ .type = {"class", "type"},
+ .controller_services = {"Controller Services"},
+ .controller_service_properties = {"Properties"},
+ .remote_process_group = {"Remote Processing Groups", "Remote Process Groups"},
+ .provenance_reporting = {"Provenance Reporting"},
+ .provenance_reporting_port_uuid = {"port uuid"},
+ .provenance_reporting_batch_size = {"batch size"},
+ .funnels = {"Funnels"},
+ .input_ports = {"Input Ports"},
+ .output_ports = {"Output Ports"},
+
+ .rpg_url = {"url"},
+ .rpg_yield_period = {"yield period"},
+ .rpg_timeout = {"timeout"},
+ .rpg_local_network_interface = {"local network interface"},
+ .rpg_transport_protocol = {"transport protocol"},
+ .rpg_proxy_host = {"proxy host"},
+ .rpg_proxy_user = {"proxy user"},
+ .rpg_proxy_password = {"proxy password"},
+ .rpg_proxy_port = {"proxy port"},
+ .rpg_input_ports = {"Input Ports"},
+ .rpg_output_ports = {"Output Ports"},
+ .rpg_port_properties = {"Properties"},
+ .rpg_port_target_id = {}
+ };
+}
+
+FlowSchema FlowSchema::getNiFiFlowJson() {
+ return FlowSchema{
+ .flow_header = {"rootGroup"},
+ .root_group = {"rootGroup"},
+ .processors = {"processors"},
+ .processor_properties = {"properties"},
+ .autoterminated_rels = {"autoTerminatedRelationships"},
+ .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"},
+ .penalization_period = {"penaltyDuration"},
+ .proc_yield_period = {"yieldDuration"},
+ // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue
+ .runduration_nanos = {"runDurationMillis"},
+ .onschedule_retry_interval = {},
+
+ .connections = {"connections"},
+ .max_queue_size = {"backPressureObjectThreshold"},
+ .max_queue_data_size = {"backPressureDataSizeThreshold"},
+ .swap_threshold = {},
+ .source_id = {"source/id"},
+ .source_name = {"source/name"},
+ .destination_id = {"destination/id"},
+ .destination_name = {"destination/name"},
+ .flowfile_expiration = {"flowFileExpiration"},
+ // contrary to nifi we support dropEmpty in flow json as well
+ .drop_empty = {"dropEmpty"},
+ .source_relationship = {},
+ .source_relationship_list = {"selectedRelationships"},
+
+ .process_groups = {"processGroups"},
+ .process_group_version = {},
+ .scheduling_strategy = {"schedulingStrategy"},
+ .scheduling_period = {"schedulingPeriod"},
+ .name = {"name"},
+ .identifier = {"identifier"},
+ .type = {"type"},
+ .controller_services = {"controllerServices"},
+ .controller_service_properties = {"properties"},
+ .remote_process_group = {"remoteProcessGroups"},
+ .provenance_reporting = {},
+ .provenance_reporting_port_uuid = {},
+ .provenance_reporting_batch_size = {},
+ .funnels = {"funnels"},
+ .input_ports = {"inputPorts"},
+ .output_ports = {"outputPorts"},
+
+ .rpg_url = {"targetUri"},
+ .rpg_yield_period = {"yieldDuration"},
+ .rpg_timeout = {"communicationsTimeout"},
+ .rpg_local_network_interface = {"localNetworkInterface"},
+ .rpg_transport_protocol = {"transportProtocol"},
+ .rpg_proxy_host = {"proxyHost"},
+ .rpg_proxy_user = {"proxyUser"},
+ .rpg_proxy_password = {"proxyPassword"},
+ .rpg_proxy_port = {"proxyPort"},
+ .rpg_input_ports = {"inputPorts"},
+ .rpg_output_ports = {"outputPorts"},
+ .rpg_port_properties = {},
+ .rpg_port_target_id = {"targetId"}
+ };
+}
+
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/Node.cpp b/libminifi/src/core/flow/Node.cpp
index 7fd7b0df8..1c751ebe0 100644
--- a/libminifi/src/core/flow/Node.cpp
+++ b/libminifi/src/core/flow/Node.cpp
@@ -21,7 +21,15 @@
namespace org::apache::nifi::minifi::core::flow {
Node::Iterator::Value Node::Iterator::operator*() const {
- return impl_->operator*();
+ Value value = impl_->operator*();
+ if (value) {
+ // sequence iterator
+ value.path_ = utils::StringUtils::join_pack(path_, "/", std::to_string(idx_));
+ } else if (value.second) {
+ // map iterator
+ value.second.path_ = utils::StringUtils::join_pack(path_, "/", value.first.getString().value());
+ }
+ return value;
}
} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 7e1d41e63..02624a4dc 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -34,13 +34,26 @@ namespace org::apache::nifi::minifi::core::flow {
std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRoot() {
+ if (!config_path_) {
+ logger_->log_error("Cannot instantiate flow, no config file is set.");
+ throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
+ }
+ const auto configuration = filesystem_->read(config_path_.value());
+ if (!configuration) {
+ // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+ return nullptr;
+ }
+ return getRootFromPayload(configuration.value());
+}
+
StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger)
: FlowConfiguration(std::move(ctx)),
logger_(std::move(logger)) {}
std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) {
- auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY];
- auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true);
+ checkRequiredField(root_flow_node, schema_.flow_header);
+ auto root_group = parseProcessGroup(root_flow_node[schema_.flow_header], root_flow_node[schema_.root_group], true);
this->name_ = root_group->getName();
return root_group;
}
@@ -48,15 +61,15 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGro
std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) {
int version = 0;
- checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
- auto flowName = node["name"].getString().value();
+ checkRequiredField(node, schema_.name);
+ auto flowName = node[schema_.name].getString().value();
utils::Identifier uuid;
// assignment throws on invalid uuid
uuid = getOrGenerateId(node);
- if (node["version"]) {
- version = gsl::narrow<int>(node["version"].getInt64().value());
+ if (node[schema_.process_group_version]) {
+ version = gsl::narrow<int>(node[schema_.process_group_version].getInt64().value());
}
logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
@@ -67,8 +80,8 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
}
- if (node["onschedule retry interval"]) {
- auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value();
+ if (node[schema_.onschedule_retry_interval]) {
+ auto onScheduleRetryPeriod = node[schema_.onschedule_retry_interval].getString().value();
logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
@@ -83,20 +96,13 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(
std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) {
auto group = createProcessGroup(header_node, is_root);
- Node processorsNode = node[CONFIG_PROCESSORS_KEY];
- Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY];
- Node funnelsNode = node[CONFIG_FUNNELS_KEY];
- Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY];
- Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY];
- Node remoteProcessingGroupsNode = [&] {
- // assignment is not supported on invalid nodes
- Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY];
- if (candidate) {
- return candidate;
- }
- return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3];
- }();
- Node childProcessGroupNodeSeq = node["Process Groups"];
+ Node processorsNode = node[schema_.processors];
+ Node connectionsNode = node[schema_.connections];
+ Node funnelsNode = node[schema_.funnels];
+ Node inputPortsNode = node[schema_.input_ports];
+ Node outputPortsNode = node[schema_.output_ports];
+ Node remoteProcessingGroupsNode = node[schema_.remote_process_group];
+ Node childProcessGroupNodeSeq = node[schema_.process_groups];
parseProcessorNode(processorsNode, group.get());
parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get());
@@ -115,10 +121,11 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(c
return group;
}
-std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node, FlowSchema schema) {
+ schema_ = std::move(schema);
uuids_.clear();
- Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY];
- Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY];
+ Node controllerServiceNode = root_node[schema_.root_group][schema_.controller_services];
+ Node provenanceReportNode = root_node[schema_.provenance_reporting];
parseControllerServices(controllerServiceNode);
// Create the root process group
@@ -157,14 +164,14 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
for (const auto& procNode : processors_node) {
core::ProcessorConfig procCfg;
- checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY);
- procCfg.name = procNode["name"].getString().value();
+ checkRequiredField(procNode, schema_.name);
+ procCfg.name = procNode[schema_.name].getString().value();
procCfg.id = getOrGenerateId(procNode);
uuid = procCfg.id;
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
- checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY);
- procCfg.javaClass = procNode["class"].getString().value();
+ checkRequiredField(procNode, schema_.type);
+ procCfg.javaClass = procNode[schema_.type].getString().value();
logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
// Determine the processor name only from the Java class
@@ -187,36 +194,35 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
- procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY);
+ procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
- procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY);
+ procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);
logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
- if (auto tasksNode = procNode["max concurrent tasks"]) {
+ if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
}
- if (procNode["penalization period"]) {
- procCfg.penalizationPeriod = procNode["penalization period"].getString().value();
+ if (auto penalizationNode = procNode[schema_.penalization_period]) {
+ procCfg.penalizationPeriod = penalizationNode.getString().value();
logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
}
- if (procNode["yield period"]) {
- procCfg.yieldPeriod = procNode["yield period"].getString().value();
+ if (auto yieldNode = procNode[schema_.proc_yield_period]) {
+ procCfg.yieldPeriod = yieldNode.getString().value();
logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
}
- if (auto runNode = procNode["run duration nanos"]) {
+ if (auto runNode = procNode[schema_.runduration_nanos]) {
procCfg.runDurationNanos = runNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
}
// handle auto-terminated relationships
- if (procNode["auto-terminated relationships list"]) {
- Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+ if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
std::vector<std::string> rawAutoTerminatedRelationshipValues;
if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
for (const auto& autoTerminatedRel : autoTerminatedSequence) {
@@ -227,9 +233,8 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
}
// handle processor properties
- if (procNode["Properties"]) {
- Node propertiesNode = procNode["Properties"];
- parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY);
+ if (Node propertiesNode = procNode[schema_.processor_properties]) {
+ parsePropertiesNode(propertiesNode, *processor, procCfg.name);
}
// Take care of scheduling
@@ -304,13 +309,13 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
return;
}
for (const auto& currRpgNode : rpg_node_seq) {
- checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
- auto name = currRpgNode["name"].getString().value();
+ checkRequiredField(currRpgNode, schema_.name);
+ auto name = currRpgNode[schema_.name].getString().value();
id = getOrGenerateId(currRpgNode);
logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
- auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ auto url = getOptionalField(currRpgNode, schema_.rpg_url, "");
logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
@@ -318,8 +323,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
auto group = createRemoteProcessGroup(name, uuid);
group->setParent(parentGroup);
- if (currRpgNode["yield period"]) {
- auto yieldPeriod = currRpgNode["yield period"].getString().value();
+ if (currRpgNode[schema_.rpg_yield_period]) {
+ auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value();
logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
@@ -329,8 +334,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
}
}
- if (currRpgNode["timeout"]) {
- auto timeout = currRpgNode["timeout"].getString().value();
+ if (currRpgNode[schema_.rpg_timeout]) {
+ auto timeout = currRpgNode[schema_.rpg_timeout].getString().value();
logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
@@ -340,33 +345,33 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
}
}
- if (currRpgNode["local network interface"]) {
- auto interface = currRpgNode["local network interface"].getString().value();
+ if (currRpgNode[schema_.rpg_local_network_interface]) {
+ auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value();
logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
group->setInterface(interface);
}
- if (currRpgNode["transport protocol"]) {
- auto transport_protocol = currRpgNode["transport protocol"].getString().value();
+ if (currRpgNode[schema_.rpg_transport_protocol]) {
+ auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value();
logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
if (transport_protocol == "HTTP") {
group->setTransportProtocol(transport_protocol);
- if (currRpgNode["proxy host"]) {
- auto http_proxy_host = currRpgNode["proxy host"].getString().value();
+ if (currRpgNode[schema_.rpg_proxy_host]) {
+ auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
group->setHttpProxyHost(http_proxy_host);
- if (currRpgNode["proxy user"]) {
- auto http_proxy_username = currRpgNode["proxy user"].getString().value();
+ if (currRpgNode[schema_.rpg_proxy_user]) {
+ auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
group->setHttpProxyUserName(http_proxy_username);
}
- if (currRpgNode["proxy password"]) {
- auto http_proxy_password = currRpgNode["proxy password"].getString().value();
+ if (currRpgNode[schema_.rpg_proxy_password]) {
+ auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value();
logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
group->setHttpProxyPassWord(http_proxy_password);
}
- if (currRpgNode["proxy port"]) {
- auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value();
+ if (currRpgNode[schema_.rpg_proxy_port]) {
+ auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value();
int32_t port;
if (core::Property::StringToInt(http_proxy_port, port)) {
logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
@@ -386,19 +391,19 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq,
group->setTransmitting(true);
group->setURL(url);
- checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY);
- auto inputPorts = currRpgNode["Input Ports"];
+ checkRequiredField(currRpgNode, schema_.rpg_input_ports);
+ auto inputPorts = currRpgNode[schema_.rpg_input_ports];
if (inputPorts && inputPorts.isSequence()) {
for (const auto& currPort : inputPorts) {
- parsePort(currPort, group.get(), sitetosite::SEND);
+ parseRPGPort(currPort, group.get(), sitetosite::SEND);
} // for node
}
- auto outputPorts = currRpgNode["Output Ports"];
+ auto outputPorts = currRpgNode[schema_.rpg_output_ports];
if (outputPorts && outputPorts.isSequence()) {
for (const auto& currPort : outputPorts) {
logger_->log_debug("Got a current port, iterating...");
- parsePort(currPort, group.get(), sitetosite::RECEIVE);
+ parseRPGPort(currPort, group.get(), sitetosite::RECEIVE);
} // for node
}
parentGroup->addProcessGroup(std::move(group));
@@ -420,10 +425,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
auto reportTask = createProvenanceReportTask();
- checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY);
- auto schedulingStrategyStr = node["scheduling strategy"].getString().value();
- checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY);
- auto schedulingPeriodStr = node["scheduling period"].getString().value();
+ checkRequiredField(node, schema_.scheduling_strategy);
+ auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value();
+ checkRequiredField(node, schema_.scheduling_period);
+ auto schedulingPeriodStr = node[schema_.scheduling_period].getString().value();
if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
@@ -456,10 +461,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
}
}
- checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY);
- auto portUUIDStr = node["port uuid"].getString().value();
- checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY);
- auto batchSizeStr = node["batch size"].getString().value();
+ checkRequiredField(node, schema_.provenance_reporting_port_uuid);
+ auto portUUIDStr = node[schema_.provenance_reporting_port_uuid].getString().value();
+ checkRequiredField(node, schema_.provenance_reporting_batch_size);
+ auto batchSizeStr = node[schema_.provenance_reporting_batch_size].getString().value();
logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
port_uuid = portUUIDStr;
@@ -481,9 +486,9 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
return;
}
for (const auto& service_node : controller_services_node) {
- checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY);
+ checkRequiredField(service_node, schema_.name);
- auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY);
+ auto type = getRequiredField(service_node, schema_.type);
logger_->log_debug("Using type %s for controller service node", type);
std::string fullType = type;
@@ -493,8 +498,8 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
type = type.substr(lastOfIdx);
}
- auto name = service_node["name"].getString().value();
- auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY);
+ auto name = service_node[schema_.name].getString().value();
+ auto id = getRequiredIdField(service_node);
utils::Identifier uuid;
uuid = id;
@@ -502,11 +507,11 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
if (nullptr != controller_service_node) {
logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
controller_service_node->initialize();
- if (Node propertiesNode = service_node["Properties"]) {
+ if (Node propertiesNode = service_node[schema_.controller_service_properties]) {
// we should propagate properties to the node and to the implementation
- parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY);
+ parsePropertiesNode(propertiesNode, *controller_service_node, name);
if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
- parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY);
+ parsePropertiesNode(propertiesNode, *controllerServiceImpl, name);
}
}
} else {
@@ -538,7 +543,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
// Default name to be same as ID
// If name is specified in configuration, use the value
- const auto name = connection_node["name"].getString().value_or(id);
+ const auto name = connection_node[schema_.name].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect connection UUID format.");
@@ -547,7 +552,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
auto connection = createConnection(name, uuid.value());
logger_->log_debug("Created connection with UUID %s and name %s", id, name);
- const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+ const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_);
connectionParser.configureConnectionSourceRelationships(*connection);
connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
@@ -561,7 +566,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c
}
}
-void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
+void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
utils::Identifier uuid;
if (!parent) {
@@ -570,9 +575,9 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
}
// Check for required fields
- checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
- auto nameStr = port_node["name"].getString().value();
- auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY,
+ checkRequiredField(port_node, schema_.name);
+ auto nameStr = port_node[schema_.name].getString().value();
+ auto portId = getRequiredIdField(port_node,
"The field 'id' is required for "
"the port named '" + nameStr + "' in the Flow Config. If this port "
"is an input port for a NiFi Remote Process Group, the port "
@@ -597,8 +602,11 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
// else defaults to RAW
// handle port properties
- if (Node propertiesNode = port_node["Properties"]) {
- parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ if (Node propertiesNode = port_node[schema_.rpg_port_properties]) {
+ parsePropertiesNode(propertiesNode, *port, nameStr);
+ } else {
+ parsePropertyNodeElement(minifi::RemoteProcessorGroupPort::portUUID.getName(), port_node[schema_.rpg_port_target_id], *port);
+ validateComponentProperties(*port, nameStr, port_node.getPath());
}
// add processor to parent
@@ -606,7 +614,7 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou
parent->addProcessor(std::move(port));
processor.setScheduledState(core::RUNNING);
- if (auto tasksNode = port_node["max concurrent tasks"]) {
+ if (auto tasksNode = port_node[schema_.max_concurrent_tasks]) {
std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value();
int32_t maxConcurrentTasks;
if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
@@ -658,7 +666,7 @@ PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTy
} else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) {
coercedValue = property_value_node.getBool().value();
} else {
- coercedValue = property_value_node.getString().value();
+ coercedValue = property_value_node.getScalarAsString().value();
}
return coercedValue;
} catch (const std::exception& e) {
@@ -687,7 +695,7 @@ void StructuredConfiguration::parseSingleProperty(const std::string& property_na
throw;
}
if (!property_set) {
- const auto rawValueString = property_value_node.getString().value();
+ const auto rawValueString = property_value_node.getScalarAsString().value();
auto proc = dynamic_cast<core::Connectable*>(&processor);
if (proc) {
logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
@@ -714,7 +722,7 @@ void StructuredConfiguration::parsePropertyNodeElement(const std::string& proper
}
}
-void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) {
+void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name) {
// Treat generically as a node so we can perform inspection on entries to ensure they are populated
logger_->log_trace("Entered %s", component_name);
for (const auto& property_node : properties_node) {
@@ -723,7 +731,7 @@ void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, c
parsePropertyNodeElement(propertyName, propertyValueNode, component);
}
- validateComponentProperties(component, component_name, section);
+ validateComponentProperties(component, component_name, properties_node.getPath());
}
void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) {
@@ -739,7 +747,7 @@ void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup*
std::string id = getOrGenerateId(funnel_node);
// Default name to be same as ID
- const auto name = funnel_node["name"].getString().value_or(id);
+ const auto name = funnel_node[schema_.name].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect funnel UUID format.");
@@ -767,7 +775,7 @@ void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGr
std::string id = getOrGenerateId(port_node);
// Default name to be same as ID
- const auto name = port_node["name"].getString().value_or(id);
+ const auto name = port_node[schema_.name].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect port UUID format.");
@@ -861,14 +869,14 @@ void StructuredConfiguration::raiseComponentError(const std::string &component_n
throw std::invalid_argument(err_msg);
}
-std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) {
- if (node[id_field]) {
- if (auto opt_id_str = node[id_field].getString()) {
+std::string StructuredConfiguration::getOrGenerateId(const Node& node) {
+ if (node[schema_.identifier]) {
+ if (auto opt_id_str = node[schema_.identifier].getString()) {
auto id = opt_id_str.value();
addNewId(id);
return id;
}
- throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string.");
+ throw std::invalid_argument("getOrGenerateId: idField '" + utils::StringUtils::join(",", schema_.identifier) + "' is expected to contain string.");
}
auto id = id_generator_->generate().to_string();
@@ -876,27 +884,24 @@ std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std
return id;
}
-std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) {
- checkRequiredField(node, "id", section, error_message);
- auto id = node["id"].getString().value();
+std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view error_message) {
+ checkRequiredField(node, schema_.identifier, error_message);
+ auto id = node[schema_.identifier].getString().value();
addNewId(id);
return id;
}
-std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section,
- const std::string& info_message) {
+std::string StructuredConfiguration::getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message) {
std::string infoMessage = info_message;
auto result = node[field_name];
if (!result) {
if (infoMessage.empty()) {
// Build a helpful info message for the user to inform them that a default is being used
- infoMessage =
- node["name"] ?
- "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" :
- "Using default value for optional field '" + field_name + "' ";
- if (!section.empty()) {
- infoMessage += " [in '" + section + "' section of configuration file]: ";
+ infoMessage = "Using default value for optional field '" + utils::StringUtils::join(",", field_name) + "'";
+ if (auto name = node["name"]) {
+ infoMessage += "' in component named '" + name.getString().value() + "'";
}
+ infoMessage += " [in '" + node.getPath() + "' section of configuration file]: ";
infoMessage += default_value;
}
diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
index f39a3ac8b..a6521884b 100644
--- a/libminifi/src/core/flow/StructuredConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -49,10 +49,9 @@ void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Conne
void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const {
// Configure connection source
- if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) {
- addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection);
- } else if (connectionNode_["source relationship names"]) {
- auto relList = connectionNode_["source relationship names"];
+ if (connectionNode_[schema_.source_relationship] && !connectionNode_[schema_.source_relationship].getString().value().empty()) {
+ addNewRelationshipToConnection(connectionNode_[schema_.source_relationship].getString().value(), connection);
+ } else if (auto relList = connectionNode_[schema_.source_relationship_list]) {
if (relList.isSequence() && !relList.empty()) {
for (const auto &rel : relList) {
addNewRelationshipToConnection(rel.getString().value(), connection);
@@ -68,7 +67,7 @@ void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::
}
uint64_t StructuredConnectionParser::getWorkQueueSize() const {
- if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) {
+ if (auto max_work_queue_data_size_node = connectionNode_[schema_.max_queue_size]) {
std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
uint64_t max_work_queue_size;
if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
@@ -81,7 +80,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const {
}
uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
- const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+ const flow::Node max_work_queue_data_size_node = connectionNode_[schema_.max_queue_data_size];
if (max_work_queue_data_size_node) {
std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
uint64_t max_work_queue_data_size = 0;
@@ -95,7 +94,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
}
uint64_t StructuredConnectionParser::getSwapThreshold() const {
- const flow::Node swap_threshold_node = connectionNode_["swap threshold"];
+ const flow::Node swap_threshold_node = connectionNode_[schema_.swap_threshold];
if (swap_threshold_node) {
auto swap_threshold_str = swap_threshold_node.getString().value();
uint64_t swap_threshold;
@@ -109,7 +108,7 @@ uint64_t StructuredConnectionParser::getSwapThreshold() const {
}
utils::Identifier StructuredConnectionParser::getSourceUUID() const {
- const flow::Node source_id_node = connectionNode_["source id"];
+ const flow::Node source_id_node = connectionNode_[schema_.source_id];
if (source_id_node) {
const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value());
if (srcUUID) {
@@ -120,8 +119,8 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
throw std::invalid_argument("Invalid source id");
}
// if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
- checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY);
- const auto connectionSrcProcName = connectionNode_["source name"].getString().value();
+ checkRequiredField(connectionNode_, schema_.source_name);
+ const auto connectionSrcProcName = connectionNode_[schema_.source_name].getString().value();
const auto srcUUID = utils::Identifier::parse(connectionSrcProcName);
if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the source name is a remote port id, so use that as the source id
@@ -141,7 +140,7 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const {
}
utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
- const flow::Node destination_id_node = connectionNode_["destination id"];
+ const flow::Node destination_id_node = connectionNode_[schema_.destination_id];
if (destination_id_node) {
const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value());
if (destUUID) {
@@ -153,8 +152,8 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
}
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
- checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY);
- auto connectionDestProcName = connectionNode_["destination name"].getString().value();
+ checkRequiredField(connectionNode_, schema_.destination_name);
+ auto connectionDestProcName = connectionNode_[schema_.destination_name].getString().value();
const auto destUUID = utils::Identifier::parse(connectionDestProcName);
if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the destination name is a remote port id, so use that as the dest id
@@ -175,7 +174,7 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const {
using namespace std::literals::chrono_literals;
- const flow::Node expiration_node = connectionNode_["flowfile expiration"];
+ const flow::Node expiration_node = connectionNode_[schema_.flowfile_expiration];
if (!expiration_node) {
logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
return 0ms;
@@ -196,7 +195,7 @@ std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() co
}
bool StructuredConnectionParser::getDropEmpty() const {
- const flow::Node drop_empty_node = connectionNode_["drop empty"];
+ const flow::Node drop_empty_node = connectionNode_[schema_.drop_empty];
if (drop_empty_node) {
return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false);
}
diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp
deleted file mode 100644
index 960f9a631..000000000
--- a/libminifi/src/core/json/JsonConfiguration.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <memory>
-#include <vector>
-#include <set>
-#include <cinttypes>
-#include <variant>
-
-#include "core/json/JsonConfiguration.h"
-#include "core/json/JsonNode.h"
-#include "core/state/Value.h"
-#include "Defaults.h"
-#include "utils/TimeUtil.h"
-
-#include "rapidjson/rapidjson.h"
-#include "rapidjson/document.h"
-
-namespace org::apache::nifi::minifi::core {
-
-namespace {
-
-} // namespace
-
-
-JsonConfiguration::JsonConfiguration(ConfigurationContext ctx)
- : StructuredConfiguration(([&] {
- if (!ctx.path) {
- ctx.path = DEFAULT_NIFI_CONFIG_JSON;
- }
- return std::move(ctx);
- })(),
- logging::LoggerFactory<JsonConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() {
- if (!config_path_) {
- logger_->log_error("Cannot instantiate flow, no config file is set.");
- throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
- }
- const auto configuration = filesystem_->read(config_path_.value());
- if (!configuration) {
- // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
- return nullptr;
- }
- try {
- rapidjson::Document doc;
- rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length());
- if (!res) {
- throw std::runtime_error("Could not parse json file");
- }
- flow::Node root{std::make_shared<JsonNode>(&doc)};
- return getRootFrom(root);
- } catch(...) {
- logger_->log_error("Invalid json configuration file");
- throw;
- }
-}
-
-std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) {
- try {
- rapidjson::Document doc;
- rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length());
- if (!res) {
- throw std::runtime_error("Could not parse json file");
- }
- flow::Node root{std::make_shared<JsonNode>(&doc)};
- return getRootFrom(root);
- } catch (const std::runtime_error& err) {
- logger_->log_error(err.what());
- throw;
- }
-}
-
-} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 89092eee5..f7bde6a66 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -40,31 +40,11 @@ YamlConfiguration::YamlConfiguration(ConfigurationContext ctx)
})(),
logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() {
- if (!config_path_) {
- logger_->log_error("Cannot instantiate flow, no config file is set.");
- throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
- }
- const auto configuration = filesystem_->read(config_path_.value());
- if (!configuration) {
- // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
- return nullptr;
- }
- try {
- YAML::Node rootYamlNode = YAML::Load(configuration.value());
- flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
- return getRootFrom(root);
- } catch(...) {
- logger_->log_error("Invalid yaml configuration file");
- throw;
- }
-}
-
std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) {
try {
YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
- return getRootFrom(root);
+ return getRootFrom(root, flow::FlowSchema::getDefault());
} catch (const YAML::ParserException &pe) {
logger_->log_error(pe.what());
throw;
@@ -75,7 +55,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const
try {
YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
- return getRootFrom(root);
+ return getRootFrom(root, flow::FlowSchema::getDefault());
} catch (const YAML::ParserException &pe) {
logger_->log_error(pe.what());
throw;
diff --git a/libminifi/test/ConfigurationTestController.h b/libminifi/test/ConfigurationTestController.h
new file mode 100644
index 000000000..05e33214d
--- /dev/null
+++ b/libminifi/test/ConfigurationTestController.h
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "TestBase.h"
+#include "core/FlowConfiguration.h"
+#include "core/RepositoryFactory.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "core/flow/AdaptiveConfiguration.h"
+
+class ConfigurationTestController : public TestController {
+ public:
+ ConfigurationTestController() {
+ flow_file_repo_ = core::createRepository("flowfilerepository");
+ configuration_ = std::make_shared<minifi::Configure>();
+ stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_);
+ content_repo_ = std::make_shared<core::repository::VolatileContentRepository>();
+
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setTrace<core::YamlConfiguration>();
+ LogTestController::getInstance().setDebug<core::Processor>();
+ LogTestController::getInstance().setTrace<core::flow::AdaptiveConfiguration>();
+ }
+
+ core::ConfigurationContext getContext() const {
+ return core::ConfigurationContext{
+ .flow_file_repo = flow_file_repo_,
+ .content_repo = content_repo_,
+ .stream_factory = stream_factory_,
+ .configuration = configuration_
+ };
+ }
+
+ std::shared_ptr<core::Repository> flow_file_repo_;
+ std::shared_ptr<minifi::Configure> configuration_;
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
+ std::shared_ptr<core::ContentRepository> content_repo_;
+};
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 63ee396b2..a3ed6a483 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -232,7 +232,7 @@ int main(int argc, char **argv) {
std::string graceful_shutdown_seconds;
std::string prov_repo_class = "provenancerepository";
std::string flow_repo_class = "flowfilerepository";
- std::string nifi_configuration_class_name = "yamlconfiguration";
+ std::string nifi_configuration_class_name = "adaptiveconfiguration";
std::string content_repo_class = "filesystemrepository";
auto log_properties = std::make_shared<core::logging::LoggerProperties>();