You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/12/19 11:41:14 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1846 - Json configuration parsing
This is an automated email from the ASF dual-hosted git repository.
martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ced398819 MINIFICPP-1846 - Json configuration parsing
ced398819 is described below
commit ced398819e053c642279e620efb55d79b736fef5
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Mon Dec 19 12:17:50 2022 +0100
MINIFICPP-1846 - Json configuration parsing
Closes #1391
Signed-off-by: Martin Zink <ma...@apache.org>
---
controller/Controller.h | 5 +-
extensions/coap/tests/CoapIntegrationBase.h | 4 +-
extensions/http-curl/tests/C2PauseResumeTest.cpp | 4 +-
.../tests/ControllerServiceIntegrationTests.cpp | 4 +-
extensions/http-curl/tests/VerifyInvokeHTTP.h | 2 +-
.../tests/unit/YamlConfigurationTests.cpp | 31 +-
.../tests/unit/YamlConnectionParserTest.cpp | 145 ++--
.../tests/unit/YamlProcessGroupParserTests.cpp | 2 +-
libminifi/CMakeLists.txt | 2 +-
libminifi/include/Defaults.h | 1 +
libminifi/include/core/ConfigurationFactory.h | 16 +-
libminifi/include/core/FlowConfiguration.h | 16 +-
.../core/{yaml => flow}/CheckRequiredField.h | 39 +-
libminifi/include/core/flow/Node.h | 139 +++
libminifi/include/core/flow/README.md | 57 ++
.../include/core/flow/StructuredConfiguration.h | 240 ++++++
.../StructuredConnectionParser.h} | 36 +-
libminifi/include/core/json/JsonConfiguration.h | 51 ++
libminifi/include/core/json/JsonNode.h | 233 +++++
libminifi/include/core/yaml/YamlConfiguration.h | 261 +-----
libminifi/include/core/yaml/YamlNode.h | 160 ++++
libminifi/include/utils/ValueCaster.h | 14 +-
libminifi/src/core/ConfigurationFactory.cpp | 48 +-
libminifi/src/core/FlowConfiguration.cpp | 24 +-
libminifi/src/core/flow/CheckRequiredField.cpp | 69 ++
.../{include/Defaults.h => src/core/flow/Node.cpp} | 15 +-
.../StructuredConfiguration.cpp} | 596 ++++++-------
.../StructuredConnectionParser.cpp} | 85 +-
libminifi/src/core/json/JsonConfiguration.cpp | 89 ++
libminifi/src/core/yaml/CheckRequiredField.cpp | 80 --
libminifi/src/core/yaml/YamlConfiguration.cpp | 948 +--------------------
libminifi/test/flow-tests/TestControllerWithFlow.h | 2 +-
libminifi/test/integration/IntegrationBase.h | 2 +-
.../test/integration/ProvenanceReportingTest.cpp | 4 +-
.../PersistableKeyValueStoreServiceTest.cpp | 2 +-
.../UnorderedMapKeyValueStoreServiceTest.cpp | 3 +-
.../test/persistence-tests/PersistenceTests.cpp | 4 +-
libminifi/test/rocksdb-tests/RepoTests.cpp | 2 +-
minifi_main/MiNiFiMain.cpp | 10 +-
39 files changed, 1620 insertions(+), 1825 deletions(-)
diff --git a/controller/Controller.h b/controller/Controller.h
index b30f79748..dd5c4119c 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -279,7 +279,8 @@ std::shared_ptr<org::apache::nifi::minifi::core::controller::ControllerService>
const auto stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(configuration);
- auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name);
+ auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(
+ org::apache::nifi::minifi::core::ConfigurationContext{prov_repo, flow_repo, content_repo, stream_factory, configuration}, nifi_configuration_class_name);
const auto controller = std::make_unique<org::apache::nifi::minifi::FlowController>(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo);
controller->load();
@@ -340,7 +341,7 @@ void printManifest(const std::shared_ptr<org::apache::nifi::minifi::Configure> &
const auto stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(configuration);
auto flow_configuration = org::apache::nifi::minifi::core::createFlowConfiguration(
- prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name);
+ org::apache::nifi::minifi::core::ConfigurationContext{prov_repo, flow_repo, content_repo, stream_factory, configuration}, nifi_configuration_class_name);
const auto controller = std::make_unique<org::apache::nifi::minifi::FlowController>(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo, "manifest");
controller->load();
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 43e97d17d..755e6fc7d 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -64,9 +64,9 @@ class CoapIntegrationBase : public IntegrationBase {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- auto yaml_ptr = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ auto yaml_ptr = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
std::shared_ptr<core::ProcessGroup> pg{ yaml_config.getRoot() };
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index bc152618c..147bbdf05 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -134,13 +134,13 @@ int main(int argc, char **argv) {
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
- test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+ core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
std::make_shared<utils::file::FileSystem>(), []{});
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+ core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
auto root = yaml_config.getRoot();
const auto proc = root->findProcessorByName("invoke");
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index ff6dd5eb3..59a408552 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -68,7 +68,7 @@ int main(int argc, char **argv) {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
- test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+ core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
const auto controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
content_repo,
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
disabled = false;
std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
+ core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file});
auto pg = yaml_config.getRoot();
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index 860bfd93a..5b917ddb5 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -91,7 +91,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- auto yaml_ptr = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path);
+ auto yaml_ptr = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, flow_yml_path});
flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME);
flowController_->load();
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index f4fd27f29..e9c9fd4bf 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -39,7 +39,7 @@ TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
SECTION("loading YAML without optional component IDs works") {
static const std::string CONFIG_YAML_WITHOUT_IDS =
@@ -230,7 +230,7 @@ TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string TEST_CONFIG_YAML =
R"(
@@ -356,7 +356,7 @@ TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string TEST_CONFIG_YAML =
R"(
@@ -510,7 +510,7 @@ TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -546,7 +546,7 @@ TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -591,7 +591,7 @@ TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
@@ -640,7 +640,7 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -662,7 +662,7 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
@@ -680,7 +680,6 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]")
REQUIRE(config_failed);
}
-#ifdef YAML_CONFIGURATION_USE_REGEX
TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
TestController test_controller;
@@ -692,7 +691,7 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -712,7 +711,7 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -733,7 +732,7 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]")
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -761,7 +760,7 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
@@ -779,8 +778,6 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
REQUIRE(config_failed);
}
-#endif // YAML_CONFIGURATION_USE_REGEX
-
TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
TestController test_controller;
@@ -789,7 +786,7 @@ TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
+ core::YamlConfiguration yamlConfig({testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration});
static const std::string CONFIG_YAML_WITH_FUNNEL =
R"(
@@ -880,7 +877,7 @@ TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- core::YamlConfiguration yaml_config(test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration);
+ core::YamlConfiguration yaml_config({test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration});
for (char i = '1'; i <= '8'; ++i) {
DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index 0276ddd90..44d346ed4 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-#include "core/yaml/YamlConnectionParser.h"
+#include "core/flow/StructuredConnectionParser.h"
#include "core/yaml/YamlConfiguration.h"
#include "TailFile.h"
#include "TestBase.h"
#include "Catch.h"
#include "utils/TestUtils.h"
+#include "core/yaml/YamlNode.h"
using namespace std::literals::chrono_literals;
namespace {
-using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::flow::StructuredConnectionParser;
using org::apache::nifi::minifi::core::YamlConfiguration;
using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+using org::apache::nifi::minifi::core::YamlNode;
+namespace flow = org::apache::nifi::minifi::core::flow;
TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") {
const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
@@ -53,25 +56,28 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
"- something_else\n" };
expectations = { { "success", "" }, { "failure", "" }, { "something_else", "" } };
}
- YAML::Node connection_node = YAML::Load(serialized_yaml);
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection);
+ YAML::Node yaml_node = YAML::Load(serialized_yaml);
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ yaml_connection_parser.configureConnectionSourceRelationships(*connection);
const std::set<core::Relationship>& relationships = connection->getRelationships();
REQUIRE(expectations == relationships);
}
SECTION("Queue size limits are read") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: 231\n"
"max work queue data size: 12 MB\n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
- REQUIRE(12582912 == yaml_connection_parser.getWorkQueueDataSizeFromYaml()); // 12 * 1024 * 1024 B
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(231 == yaml_connection_parser.getWorkQueueSize());
+ REQUIRE(12_MiB == yaml_connection_parser.getWorkQueueDataSize());
}
SECTION("Queue swap threshold is read") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"swap threshold: 231\n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(231 == yaml_connection_parser.getSwapThresholdFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(231 == yaml_connection_parser.getSwapThreshold());
}
SECTION("Source and destination names and uuids are read") {
const utils::Identifier expected_source_id = utils::generateUUID();
@@ -94,115 +100,120 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
"source name: TailFile_1\n"
"destination name: TailFile_2\n" };
}
- YAML::Node connection_node = YAML::Load(serialized_yaml);
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUIDFromYaml());
- REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUIDFromYaml());
+ YAML::Node yaml_node = YAML::Load(serialized_yaml);
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUID());
+ REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUID());
}
SECTION("Flow file expiration is read") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"flowfile expiration: 2 min\n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(2min == yaml_connection_parser.getFlowFileExpirationFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(2min == yaml_connection_parser.getFlowFileExpiration());
}
SECTION("Drop empty value is read") {
SECTION("When config contains true value") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"drop empty: true\n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(true == yaml_connection_parser.getDropEmpty());
}
SECTION("When config contains false value") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"drop empty: false\n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ REQUIRE(false == yaml_connection_parser.getDropEmpty());
}
}
SECTION("Errors are handled properly when configuration lines are missing") {
const auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "name");
SECTION("With empty configuration") {
- YAML::Node connection_node = YAML::Load(std::string(""));
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
- CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getSwapThresholdFromYaml());
- CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
- CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+ YAML::Node yaml_node = YAML::Load(std::string(""));
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+
+ CHECK_THROWS(StructuredConnectionParser(connection_node, "test_node", parent_ptr, logger));
}
SECTION("With a configuration that lists keys but has no assigned values") {
std::string serialized_yaml;
SECTION("Single relationship name left empty") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"source name: \n"
"destination name: \n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
// This seems incorrect, but we do not want to ruin backward compatibility
- CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
+ CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
}
SECTION("List of relationship names contains empty item") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"source relationship names:\n"
"- \n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(*connection));
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
}
SECTION("Source and destination lookup from via id") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"source id: \n"
"destination id: \n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
- CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ CHECK_THROWS(yaml_connection_parser.getSourceUUID());
+ CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
}
SECTION("Source and destination lookup via name") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"source name: \n"
"destination name: \n" });
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
- CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ CHECK_THROWS(yaml_connection_parser.getSourceUUID());
+ CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
}
SECTION("Queue limits and configuration") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: \n"
"max work queue data size: \n"
"swap threshold: \n"
"flowfile expiration: \n"
"drop empty: \n"});
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK(0 == yaml_connection_parser.getWorkQueueSizeFromYaml());
- CHECK(0 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK(0 == yaml_connection_parser.getSwapThresholdFromYaml());
- CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
- CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ CHECK(0 == yaml_connection_parser.getWorkQueueSize());
+ CHECK(0 == yaml_connection_parser.getWorkQueueDataSize());
+ CHECK(0 == yaml_connection_parser.getSwapThreshold());
+ CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
+ CHECK(0 == yaml_connection_parser.getDropEmpty());
}
}
SECTION("With a configuration that has values of incorrect format") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: 2 KB\n"
"max work queue data size: 10 Incorrect\n"
"flowfile expiration: 12\n"
"drop empty: sup\n"});
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
// This seems incorrect, but we do not want to ruin backward compatibility
- CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
- CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSize());
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSize());
+ CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpiration());
+ CHECK_NOTHROW(yaml_connection_parser.getDropEmpty());
}
SECTION("Known incorrect formats that behave strangely") {
- YAML::Node connection_node = YAML::Load(std::string {
+ YAML::Node yaml_node = YAML::Load(std::string {
"max work queue data size: 2 Baby Pandas (img, 20 MB) that are cared for by a group of 30 giraffes\n"
"flowfile expiration: 0\n"
"drop empty: NULL\n"});
- YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- CHECK(2 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
- CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
+ flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
+ StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
+ CHECK(2 == yaml_connection_parser.getWorkQueueDataSize());
+ CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
+ CHECK(0 == yaml_connection_parser.getDropEmpty());
}
}
}
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
index 0f5879587..4ad98d747 100644
--- a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -22,7 +22,7 @@
#include "IntegrationTestUtils.h"
#include "ProcessGroupTestUtils.h"
-static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
+static core::YamlConfiguration config({nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>()});
TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser]") {
auto pattern = Group("root")
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 049e5ef41..f4bbee6f8 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...]
+file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...]
# manually add this as it might not yet be present when this executes
list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp")
diff --git a/libminifi/include/Defaults.h b/libminifi/include/Defaults.h
index 0ee8eaa48..3e119e294 100644
--- a/libminifi/include/Defaults.h
+++ b/libminifi/include/Defaults.h
@@ -18,6 +18,7 @@
#pragma once
const std::filesystem::path DEFAULT_NIFI_CONFIG_YML = std::filesystem::path("conf") / "config.yml";
+const std::filesystem::path DEFAULT_NIFI_CONFIG_JSON = std::filesystem::path("conf") / "config.json";
const std::filesystem::path DEFAULT_NIFI_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi.properties";
const std::filesystem::path DEFAULT_LOG_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-log.properties";
const std::filesystem::path DEFAULT_UID_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-uid.properties";
diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h
index 1d648f211..c3efee5e1 100644
--- a/libminifi/include/core/ConfigurationFactory.h
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -23,6 +23,7 @@
#include <optional>
#include <string>
#include <type_traits>
+#include <utility>
#include "FlowConfiguration.h"
@@ -33,24 +34,15 @@ namespace minifi {
namespace core {
template<typename T>
-T* instantiate(
- const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo,
- const std::shared_ptr<core::ContentRepository> &content_repo, const std::shared_ptr<io::StreamFactory> &stream_factory,
- std::shared_ptr<Configure> configuration, const std::optional<std::string>& path,
- const std::shared_ptr<utils::file::FileSystem>& filesystem) {
- return new T(repo, flow_file_repo, content_repo, stream_factory, configuration, path, filesystem);
+T* instantiate(ConfigurationContext ctx) {
+ return new T(std::move(ctx));
}
/**
* Configuration factory is used to create a new FlowConfiguration
* object.
*/
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
- std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
- std::shared_ptr<io::StreamFactory> stream_factory, const std::string& configuration_class_name,
- const std::optional<std::string>& path = {}, std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(),
- bool fail_safe = false);
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const ConfigurationContext& ctx, const std::optional<std::string>& configuration_class_name, bool fail_safe = false);
} // namespace core
} // namespace minifi
diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h
index 36f111d50..4039d7224 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -22,6 +22,7 @@
#include <string>
#include <utility>
#include <vector>
+#include <filesystem>
#include "core/Core.h"
#include "Connection.h"
@@ -51,6 +52,16 @@ class static_initializers {
extern static_initializers &get_static_functions();
+struct ConfigurationContext {
+ std::shared_ptr<core::Repository> repo;
+ std::shared_ptr<core::Repository> flow_file_repo;
+ std::shared_ptr<core::ContentRepository> content_repo;
+ std::shared_ptr<io::StreamFactory> stream_factory;
+ std::shared_ptr<Configure> configuration;
+ std::optional<std::filesystem::path> path{std::nullopt};
+ std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
+};
+
/**
* Purpose: Flow configuration defines the mechanism
* by which we will configure our flow controller
@@ -61,10 +72,7 @@ class FlowConfiguration : public CoreComponent {
* Constructor that will be used for configuring
* the flow controller.
*/
- explicit FlowConfiguration(const std::shared_ptr<core::Repository>& repo, std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
- std::shared_ptr<Configure> configuration, const std::optional<std::filesystem::path>& path,
- std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>());
+ explicit FlowConfiguration(ConfigurationContext ctx);
~FlowConfiguration() override;
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h
similarity index 54%
rename from libminifi/include/core/yaml/CheckRequiredField.h
rename to libminifi/include/core/flow/CheckRequiredField.h
index 2f0e4b0eb..6c35b0cbc 100644
--- a/libminifi/include/core/yaml/CheckRequiredField.h
+++ b/libminifi/include/core/flow/CheckRequiredField.h
@@ -21,45 +21,34 @@
#include <memory>
#include <vector>
-#include "core/logging/LoggerFactory.h"
-#include "yaml-cpp/yaml.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/flow/Node.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace yaml {
+namespace org::apache::nifi::minifi::core::flow {
-bool isFieldPresent(const YAML::Node &yaml_node, std::string_view field_name);
-std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_field_names, std::string_view yaml_section = "");
+bool isFieldPresent(const Node &node, std::string_view field_name);
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = "");
/**
* This is a helper function for verifying the existence of a required
- * field in a YAML::Node object. If the field is not present, an error
+ * field in a Node object. If the field is not present, an error
* message will be logged and an std::invalid_argument exception will be
- * thrown indicating the absence of the required field in the YAML node.
+ * thrown indicating the absence of the required field in the node.
*
- * @param yaml_node the YAML node to check
+ * @param node the node to check
* @param field_name the required field key
- * @param yaml_section [optional] the top level section of the YAML config
- * for the yaml_node. This is used for generating a
+ * @param section [optional] the top level section of the config
+ * for the node. This is used for generating a
* useful error message for troubleshooting.
* @param error_message [optional] the error message string to use if
* the required field is missing. If not provided,
* a default error message will be generated.
*
* @throws std::invalid_argument if the required field 'field_name' is
- * not present in 'yaml_node'
+ * not present in 'node'
*/
-void checkRequiredField(
- const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section = "", std::string_view error_message = "");
+void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = "");
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message = {});
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {});
-} // namespace yaml
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h
new file mode 100644
index 000000000..78735f95d
--- /dev/null
+++ b/libminifi/include/core/flow/Node.h
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string_view>
+#include <tuple>
+#include <optional>
+#include <string>
+#include <memory>
+#include <utility>
+#include "nonstd/expected.hpp"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+class Node {
+ public:
+ struct Cursor {
+ int line{0};
+ int column{0};
+ int pos{0};
+ };
+
+ class Iterator {
+ public:
+ class Value;
+
+ class IteratorImpl {
+ public:
+ virtual IteratorImpl& operator++() = 0;
+ virtual bool operator==(const IteratorImpl& other) const = 0;
+ virtual Value operator*() const = 0;
+ bool operator!=(const IteratorImpl& other) const {return !(*this == other);}
+
+ virtual std::unique_ptr<IteratorImpl> clone() const = 0;
+ virtual ~IteratorImpl() = default;
+ };
+
+ Iterator& operator++() {
+ impl_->operator++();
+ return *this;
+ }
+
+ explicit Iterator(std::unique_ptr<IteratorImpl> impl) : impl_(std::move(impl)) {}
+ Iterator(const Iterator& other): impl_(other.impl_->clone()) {}
+ Iterator(Iterator&&) = default;
+ Iterator& operator=(const Iterator& other) {
+ if (this == &other) {
+ return *this;
+ }
+ impl_ = other.impl_->clone();
+ return *this;
+ }
+ Iterator& operator=(Iterator&&) = default;
+
+ bool operator==(const Iterator& other) const {return impl_->operator==(*other.impl_);}
+ bool operator!=(const Iterator& other) const {return !(*this == other);}
+
+ Value operator*() const;
+
+ private:
+ std::unique_ptr<IteratorImpl> impl_;
+ };
+
+ class NodeImpl {
+ public:
+ virtual explicit operator bool() const = 0;
+ virtual bool isSequence() const = 0;
+ virtual bool isMap() const = 0;
+ virtual bool isNull() const = 0;
+
+ virtual nonstd::expected<std::string, std::exception_ptr> getString() const = 0;
+ virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0;
+ virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0;
+ virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0;
+
+ virtual std::string getDebugString() const = 0;
+
+ virtual size_t size() const = 0;
+ virtual Iterator begin() const = 0;
+ virtual Iterator end() const = 0;
+ virtual Node operator[](std::string_view key) const = 0;
+
+ virtual std::optional<Cursor> getCursor() const = 0;
+
+ virtual ~NodeImpl() = default;
+ };
+
+ Node() = default;
+ explicit Node(std::shared_ptr<NodeImpl> impl): impl_(std::move(impl)) {}
+
+ explicit operator bool() const {return impl_->operator bool();}
+ bool isSequence() const {return impl_->isSequence();}
+ bool isMap() const {return impl_->isMap();}
+ bool isNull() const {return impl_->isNull();}
+
+ nonstd::expected<std::string, std::exception_ptr> getString() const {return impl_->getString();}
+ nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();}
+ nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();}
+ nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();}
+
+ // return a string representation of the node (need not to be deserializable)
+ std::string getDebugString() const {return impl_->getDebugString();}
+
+ size_t size() const {return impl_->size();}
+ size_t empty() const {
+ return size() == 0;
+ }
+ Iterator begin() const {return impl_->begin();}
+ Iterator end() const {return impl_->end();}
+ Node operator[](std::string_view key) const {return impl_->operator[](key);}
+
+ std::optional<Cursor> getCursor() const {return impl_->getCursor();}
+
+ private:
+ std::shared_ptr<NodeImpl> impl_;
+};
+
+class Node::Iterator::Value : public Node, public std::pair<Node, Node> {
+ public:
+ Value(Node node, Node key, Node value): Node{std::move(node)}, std::pair<Node, Node>{std::move(key), std::move(value)} {}
+};
+
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/flow/README.md b/libminifi/include/core/flow/README.md
new file mode 100644
index 000000000..c62c71495
--- /dev/null
+++ b/libminifi/include/core/flow/README.md
@@ -0,0 +1,57 @@
+## Differences between JSON and YAML implementation
+
+### YAML
+
+The possible types of a `YAML::Node` are:
+* Undefined
+* Null
+* Map
+* Sequence
+* Scalar
+
+#### Undefined
+
+The result of querying any member of `Null`, querying non-existing members of `Map`,
+or non-existing indices of `Sequence`.
+
+Note that for `Map`s string conversion applies `map[0]` could be valid, given a key `"0"`,
+while for `Sequence`s string index parsing does NOT happen `seq["0"]`
+will return `Undefined` even if the sequence is non-empty.
+
+Querying or otherwise accessing an `Undefined` (other than `operator bool` or `IsDefined`) usually throws.
+
+#### Null
+
+The value of parsing an empty document, the value of a `Map` item with empty value,
+the value of an omitted `Sequence` item.
+
+```
+key1: # this is a Null
+key2: '' # this is a Scalar, the empty string
+arr:
+ - one
+ - # Null as well
+ - three
+```
+
+#### Scalar
+
+A string value, all conversions to numbers happen on the fly.
+
+### Conversions
+
+#### 1. `::as<std::string>`
+
+* `Null` --> `"null"`
+* `Scalar` --> the string value
+* others --> throws
+
+#### 2. `::as<YAML::Node>`
+
+It was used in multiple places, it seems to throw on `Undefined` and return itself for all
+other types.
+
+### JSON
+
+In contrast to these JSON has real bools, numbers, strings, there is no string-to-int
+conversion happening.
diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h
new file mode 100644
index 000000000..1b46cda9d
--- /dev/null
+++ b/libminifi/include/core/flow/StructuredConfiguration.h
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_set>
+
+#include "core/FlowConfiguration.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessorConfig.h"
+#include "Exception.h"
+#include "io/StreamFactory.h"
+#include "io/validation.h"
+#include "sitetosite/SiteToSite.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileSystem.h"
+#include "core/flow/Node.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller";
+static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors";
+static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services";
+static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
+static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
+static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting";
+static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels";
+static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports";
+static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports";
+
+class StructuredConfiguration : public FlowConfiguration {
+ public:
+ StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger);
+
+ /**
+ * Iterates all component property validation rules and checks that configured state
+ * is valid. If state is determined to be invalid, conf parsing ends and an error is raised.
+ *
+ * @param component
+ * @param component_name
+ * @param section
+ */
+ void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string §ion) const;
+
+ protected:
+ /**
+ * Returns a shared pointer to a ProcessGroup object containing the
+ * flow configuration.
+ *
+ * @param root_node a pointer to a Node object containing the root
+ * node of the parsed document
+ * @return the root ProcessGroup node of the flow
+ * configuration tree
+ */
+ std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node);
+
+ std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false);
+
+ std::unique_ptr<core::ProcessGroup> parseProcessGroup(const Node& header_node, const Node& node, bool is_root = false);
+ /**
+ * Parses processors from its corresponding config node and adds
+ * them to a parent ProcessGroup. The processors_node argument must point
+ * to a Node containing the processors configuration. Processor
+ * objects will be created and added to the parent ProcessGroup specified
+ * by the parent argument.
+ *
+ * @param processors_node the Node containing the processor configuration
+ * @param parent the parent ProcessGroup to which the the created
+ * Processor should be added
+ */
+ void parseProcessorNode(const Node& processors_node, core::ProcessGroup* parent);
+
+ /**
+ * Parses a port from its corresponding config node and adds
+ * it to a parent ProcessGroup. The port_node argument must point
+ * to a Node containing the port configuration. A RemoteProcessorGroupPort
+ * object will be created a added to the parent ProcessGroup specified
+ * by the parent argument.
+ *
+ * @param port_node the Node containing the port configuration
+ * @param parent the parent ProcessGroup for the port
+ * @param direction the TransferDirection of the port
+ */
+ void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
+
+ /**
+ * Parses the root level node for the flow configuration and
+ * returns a ProcessGroup containing the tree of flow configuration
+ * objects.
+ *
+ * @param root_flow_node
+ * @return
+ */
+ std::unique_ptr<core::ProcessGroup> parseRootProcessGroup(const Node& root_flow_node);
+
+ void parseProcessorProperty(const Node& doc, const Node& node, std::shared_ptr<core::Processor> processor);
+
+ void parseControllerServices(const Node& controller_services_node);
+
+ /**
+ * Parses the Connections section of a configuration.
+ * The resulting Connections are added to the parent ProcessGroup.
+ *
+ * @param connection_node_seq the Node containing the Connections section
+ * of the configuration
+ * @param parent the root node of flow configuration to which
+ * to add the connections that are parsed
+ */
+ void parseConnection(const Node& connection_node_seq, core::ProcessGroup* parent);
+
+ /**
+ * Parses the Remote Process Group section of a configuration.
+ * The resulting Process Group is added to the parent ProcessGroup.
+ *
+ * @param rpg_node_seq the Node containing the Remote Process Group
+ * section of the configuration
+ * @param parent the root node of flow configuration to which
+ * to add the process groups that are parsed
+ */
+ void parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parent);
+
+ /**
+ * Parses the Provenance Reporting section of a configuration.
+ * The resulting Provenance Reporting processor is added to the
+ * parent ProcessGroup.
+ *
+ * @param report_node the Node containing the provenance
+ * reporting configuration
+ * @param parent_group the root node of flow configuration to which
+ * to add the provenance reporting config
+ */
+ void parseProvenanceReporting(const Node& report_node, core::ProcessGroup* parent_group);
+
+ /**
+ * A helper function to parse the Properties Node for a processor.
+ *
+ * @param properties_node the Node containing the properties
+ * @param processor the Processor to which to add the resulting properties
+ */
+ void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section);
+
+ /**
+ * Parses the Funnels section of a configuration.
+ * The resulting Funnels are added to the parent ProcessGroup.
+ *
+ * @param node the Node containing the Funnels section
+ * of the configuration
+ * @param parent the root node of flow configuration to which
+ * to add the funnels that are parsed
+ */
+ void parseFunnels(const Node& node, core::ProcessGroup* parent);
+
+ /**
+ * Parses the Input/Output Ports section of a configuration YAML.
+ * The resulting ports are added to the parent ProcessGroup.
+ *
+ * @param node the YAML::Node containing the Input/Output Ports section
+ * of the configuration YAML
+ * @param parent the root node of flow configuration to which
+ * to add the funnels that are parsed
+ */
+ void parsePorts(const flow::Node& node, core::ProcessGroup* parent, PortType port_type);
+
+ /**
+ * A helper function for parsing or generating optional id fields.
+ *
+ * In parsing flow configurations for config schema v1, the
+ * 'id' field of most component types that contains a UUID is optional.
+ * This function will check for the existence of the specified
+ * idField in the specified node. If present, the field will be parsed
+ * as a UUID and the UUID string will be returned. If not present, a
+ * random UUID string will be generated and returned.
+ *
+ * @param node a pointer to the Node that will be checked for the
+ * presence of an idField
+ * @param id_field the string of the name of the idField to check for. This
+ * is optional and defaults to 'id'
+ * @return the parsed or generated UUID string
+ */
+ std::string getOrGenerateId(const Node& node, const std::string& id_field = "id");
+
+ std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = "");
+
+ /**
+ * This is a helper function for getting an optional value, if it exists.
+ * If it does not exist, returns the provided default value.
+ *
+ * @param node the flow node to check
+ * @param field_name the optional field key
+ * @param default_value the default value to use if field is not set
+ * @param section [optional] the top level section of the config
+ * for the node. This is used fpr generating a
+ * useful info message for troubleshooting.
+ * @param info_message [optional] the info message string to use if
+ * the optional field is missing. If not provided,
+ * a default info message will be generated.
+ */
+ std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = "");
+
+ static std::shared_ptr<utils::IdGenerator> id_generator_;
+ std::unordered_set<std::string> uuids_;
+ std::shared_ptr<logging::Logger> logger_;
+
+ private:
+ PropertyValue getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& property_from_processor, const Node& property_value_node);
+ void parsePropertyValueSequence(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& component);
+ void parseSingleProperty(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor);
+ void parsePropertyNodeElement(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor);
+ void addNewId(const std::string& uuid);
+
+ /**
+ * Raises a human-readable configuration error for the given configuration component/section.
+ *
+ * @param component_name
+ * @param section
+ * @param reason
+ */
+ void raiseComponentError(const std::string &component_name, const std::string §ion, const std::string &reason) const;
+};
+
+} // namespace org::apache::nifi::minifi::core::flow
+
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h
similarity index 61%
rename from libminifi/include/core/yaml/YamlConnectionParser.h
rename to libminifi/include/core/flow/StructuredConnectionParser.h
index e1c544d77..2b1d73743 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/flow/StructuredConnectionParser.h
@@ -24,38 +24,42 @@
#include "core/ProcessGroup.h"
#include "core/logging/LoggerFactory.h"
-#include "yaml-cpp/yaml.h"
+#include "core/flow/Node.h"
#include "utils/gsl.h"
-namespace org::apache::nifi::minifi::core::yaml {
+namespace org::apache::nifi::minifi::core::flow {
-class YamlConnectionParser {
+class StructuredConnectionParser {
public:
- static constexpr const char* CONFIG_YAML_CONNECTIONS_KEY{ "Connections" };
+ static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" };
- explicit YamlConnectionParser(const YAML::Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
+ explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) :
connectionNode_(connectionNode),
name_(name),
parent_(parent),
- logger_(logger) {}
+ logger_(logger) {
+ if (!connectionNode.isMap()) {
+ throw std::logic_error("Connection node is not a map");
+ }
+ }
- void configureConnectionSourceRelationshipsFromYaml(minifi::Connection& connection) const;
- [[nodiscard]] uint64_t getWorkQueueSizeFromYaml() const;
- [[nodiscard]] uint64_t getWorkQueueDataSizeFromYaml() const;
- [[nodiscard]] utils::Identifier getSourceUUIDFromYaml() const;
- [[nodiscard]] uint64_t getSwapThresholdFromYaml() const;
- [[nodiscard]] utils::Identifier getDestinationUUIDFromYaml() const;
- [[nodiscard]] std::chrono::milliseconds getFlowFileExpirationFromYaml() const;
- [[nodiscard]] bool getDropEmptyFromYaml() const;
+ void configureConnectionSourceRelationships(minifi::Connection& connection) const;
+ [[nodiscard]] uint64_t getWorkQueueSize() const;
+ [[nodiscard]] uint64_t getWorkQueueDataSize() const;
+ [[nodiscard]] uint64_t getSwapThreshold() const;
+ [[nodiscard]] utils::Identifier getSourceUUID() const;
+ [[nodiscard]] utils::Identifier getDestinationUUID() const;
+ [[nodiscard]] std::chrono::milliseconds getFlowFileExpiration() const;
+ [[nodiscard]] bool getDropEmpty() const;
private:
void addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const;
void addFunnelRelationshipToConnection(minifi::Connection& connection) const;
- const YAML::Node& connectionNode_;
+ const Node& connectionNode_;
const std::string& name_;
gsl::not_null<core::ProcessGroup*> parent_;
const std::shared_ptr<logging::Logger> logger_;
};
-} // namespace org::apache::nifi::minifi::core::yaml
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/json/JsonConfiguration.h
new file mode 100644
index 000000000..35623381d
--- /dev/null
+++ b/libminifi/include/core/json/JsonConfiguration.h
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_set>
+
+#include "core/FlowConfiguration.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/ProcessorConfig.h"
+#include "Exception.h"
+#include "io/StreamFactory.h"
+#include "io/validation.h"
+#include "sitetosite/SiteToSite.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileSystem.h"
+#include "core/flow/StructuredConfiguration.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class JsonConfiguration : public flow::StructuredConfiguration {
+ public:
+ explicit JsonConfiguration(ConfigurationContext ctx);
+
+ ~JsonConfiguration() override = default;
+
+ std::unique_ptr<core::ProcessGroup> getRoot() override;
+
+ std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override;
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h
new file mode 100644
index 000000000..d19ca1582
--- /dev/null
+++ b/libminifi/include/core/json/JsonNode.h
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "core/flow/Node.h"
+#include "rapidjson/document.h"
+#include "utils/gsl.h"
+#include "utils/ValueCaster.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class JsonNode : public flow::Node::NodeImpl {
+ public:
+ explicit JsonNode(const rapidjson::Value* node): node_(node) {}
+
+ explicit operator bool() const override {
+ return node_ != nullptr;
+ }
+ bool isSequence() const override {
+ return node_ ? node_->IsArray() : false;
+ }
+ bool isMap() const override {
+ return node_ ? node_->IsObject() : false;
+ }
+ bool isNull() const override {
+ return node_ ? node_->IsNull() : false;
+ }
+
+ nonstd::expected<std::string, std::exception_ptr> getString() const override {
+ try {
+ if (!node_) {
+ throw std::runtime_error("Cannot get string of invalid json value");
+ }
+ if (!node_->IsString()) {
+ throw std::runtime_error("Cannot get string of non-string json value");
+ }
+ return std::string{node_->GetString(), node_->GetStringLength()};
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<int64_t, std::exception_ptr> getInt64() const override {
+ try {
+ if (!node_) {
+ throw std::runtime_error("Cannot get int64 of invalid json value");
+ }
+ if (!node_->IsInt64()) {
+ throw std::runtime_error("Cannot get int64 of non-int64 json value");
+ }
+ return node_->GetInt64();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<bool, std::exception_ptr> getBool() const override {
+ try {
+ if (!node_) {
+ throw std::runtime_error("Cannot get bool of invalid json value");
+ }
+ if (!node_->IsBool()) {
+ throw std::runtime_error("Cannot get bool of non-bool json value");
+ }
+ return node_->GetBool();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const override {
+ try {
+ if (!node_) throw std::runtime_error("Cannot get string from invalid json value");
+ if (node_->IsInt64()) return std::to_string(node_->GetInt64());
+ if (node_->IsUint64()) return std::to_string(node_->GetUint64());
+ throw std::runtime_error("Cannot get string from non-integer json value");
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ std::string getDebugString() const override {
+ if (!node_) return "<invalid>";
+ if (node_->IsObject()) return "<Map>";
+ if (node_->IsArray()) return "<Array>";
+ if (node_->IsNull()) return "null";
+ if (auto int_str = getIntegerAsString()) {
+ return int_str.value();
+ }
+ if (node_->IsTrue()) return "true";
+ if (node_->IsFalse()) return "false";
+ if (node_->IsDouble()) return std::to_string(node_->GetDouble());
+ if (node_->IsString()) return '"' + std::string(node_->GetString(), node_->GetStringLength()) + '"';
+ return "<unknown>";
+ }
+
+ size_t size() const override {
+ if (!node_) {
+ throw std::runtime_error("Cannot get size of invalid json value");
+ }
+ if (!node_->IsArray()) {
+ throw std::runtime_error("Cannot get size of non-array json value");
+ }
+ return node_->Size();
+ }
+ flow::Node::Iterator begin() const override;
+ flow::Node::Iterator end() const override;
+
+ flow::Node operator[](std::string_view key) const override {
+ if (!node_ || node_->IsArray() || node_->IsNull()) {
+ return flow::Node{std::make_shared<JsonNode>(nullptr)};
+ }
+ if (!node_->IsObject()) {
+ throw std::runtime_error("Cannot get member of scalar json value");
+ }
+ auto it = node_->FindMember(rapidjson::Value(rapidjson::StringRef(key.data(), key.length())));
+ if (it == node_->MemberEnd()) {
+ return flow::Node{std::make_shared<JsonNode>(nullptr)};
+ }
+ return flow::Node{std::make_shared<JsonNode>(&it->value)};
+ }
+
+ std::optional<flow::Node::Cursor> getCursor() const override {
+ return std::nullopt;
+ }
+
+ private:
+ const rapidjson::Value* node_;
+};
+
+class JsonValueIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+ explicit JsonValueIterator(rapidjson::Value::ConstValueIterator it): it_(std::move(it)) {}
+
+ IteratorImpl& operator++() override {
+ ++it_;
+ return *this;
+ }
+ bool operator==(const IteratorImpl& other) const override {
+ const auto* ptr = dynamic_cast<const JsonValueIterator*>(&other);
+ gsl_Expects(ptr);
+ return it_ == ptr->it_;
+ }
+ flow::Node::Iterator::Value operator*() const override {
+ auto node = flow::Node{std::make_shared<JsonNode>(&*it_)};
+ auto first = flow::Node{std::make_shared<JsonNode>(nullptr)};
+ auto second = flow::Node{std::make_shared<JsonNode>(nullptr)};
+ return {std::move(node), std::move(first), std::move(second)};
+ }
+
+ std::unique_ptr<IteratorImpl> clone() const override {
+ return std::make_unique<JsonValueIterator>(it_);
+ }
+
+ private:
+ rapidjson::Value::ConstValueIterator it_;
+};
+
+class JsonMemberIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+ explicit JsonMemberIterator(rapidjson::Value::ConstMemberIterator it): it_(std::move(it)) {}
+
+ IteratorImpl& operator++() override {
+ ++it_;
+ return *this;
+ }
+ bool operator==(const IteratorImpl& other) const override {
+ const auto* ptr = dynamic_cast<const JsonMemberIterator*>(&other);
+ gsl_Expects(ptr);
+ return it_ == ptr->it_;
+ }
+ flow::Node::Iterator::Value operator*() const override {
+ auto node = flow::Node{std::make_shared<JsonNode>(nullptr)};
+ auto first = flow::Node{std::make_shared<JsonNode>(&it_->name)};
+ auto second = flow::Node{std::make_shared<JsonNode>(&it_->value)};
+ return flow::Node::Iterator::Value(node, first, second);
+ }
+
+ std::unique_ptr<IteratorImpl> clone() const override {
+ return std::make_unique<JsonMemberIterator>(it_);
+ }
+
+ private:
+ rapidjson::Value::ConstMemberIterator it_;
+};
+
+inline flow::Node::Iterator JsonNode::begin() const {
+ if (!node_) {
+ throw std::runtime_error("Cannot get begin of invalid json value");
+ }
+ if (node_->IsArray()) {
+ return flow::Node::Iterator{std::make_unique<JsonValueIterator>(node_->Begin())};
+ } else if (node_->IsObject()) {
+ return flow::Node::Iterator{std::make_unique<JsonMemberIterator>(node_->MemberBegin())};
+ } else {
+ throw std::runtime_error("Json node is not iterable, neither array nor object");
+ }
+}
+
+inline flow::Node::Iterator JsonNode::end() const {
+ if (!node_) {
+ throw std::runtime_error("Cannot get end of invalid json value");
+ }
+ if (node_->IsArray()) {
+ return flow::Node::Iterator{std::make_unique<JsonValueIterator>(node_->End())};
+ } else if (node_->IsObject()) {
+ return flow::Node::Iterator{std::make_unique<JsonMemberIterator>(node_->MemberEnd())};
+ } else {
+ throw std::runtime_error("Json node is not iterable, neither array nor object");
+ }
+}
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index a8b7ab5c1..996608dcf 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#pragma once
#include <memory>
@@ -32,35 +33,15 @@
#include "utils/Id.h"
#include "utils/StringUtils.h"
#include "utils/file/FileSystem.h"
-#include "yaml-cpp/yaml.h"
+#include "core/flow/StructuredConfiguration.h"
class YamlConfigurationTestAccessor;
namespace org::apache::nifi::minifi::core {
-static constexpr char const* CONFIG_YAML_FLOW_CONTROLLER_KEY = "Flow Controller";
-static constexpr char const* CONFIG_YAML_PROCESSORS_KEY = "Processors";
-static constexpr char const* CONFIG_YAML_CONTROLLER_SERVICES_KEY = "Controller Services";
-static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups";
-static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
-static constexpr char const* CONFIG_YAML_PROVENANCE_REPORT_KEY = "Provenance Reporting";
-static constexpr char const* CONFIG_YAML_FUNNELS_KEY = "Funnels";
-static constexpr char const* CONFIG_YAML_INPUT_PORTS_KEY = "Input Ports";
-static constexpr char const* CONFIG_YAML_OUTPUT_PORTS_KEY = "Output Ports";
-
-#define YAML_CONFIGURATION_USE_REGEX
-
-// Disable regex in EL for incompatible compilers
-#if __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 9)
-#undef YAML_CONFIGURATION_USE_REGEX
-#endif
-
-class YamlConfiguration : public FlowConfiguration {
+class YamlConfiguration : public flow::StructuredConfiguration {
public:
- explicit YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
- const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
- const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path = {},
- const std::shared_ptr<utils::file::FileSystem>& filesystem = std::make_shared<utils::file::FileSystem>());
+ explicit YamlConfiguration(ConfigurationContext ctx);
~YamlConfiguration() override = default;
@@ -71,23 +52,7 @@ class YamlConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getRoot() override {
- if (!config_path_) {
- logger_->log_error("Cannot instantiate flow, no config file is set.");
- throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
- }
- const auto configuration = filesystem_->read(config_path_.value());
- if (!configuration) {
- return nullptr;
- }
- try {
- YAML::Node rootYamlNode = YAML::Load(configuration.value());
- return getYamlRoot(rootYamlNode);
- } catch(...) {
- logger_->log_error("Invalid yaml configuration file");
- throw;
- }
- }
+ std::unique_ptr<core::ProcessGroup> getRoot() override;
/**
* Returns a shared pointer to a ProcessGroup object containing the
@@ -100,16 +65,7 @@ class YamlConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream) {
- try {
- YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
- return getYamlRoot(rootYamlNode);
- } catch (const YAML::ParserException &pe) {
- logger_->log_error(pe.what());
- std::rethrow_exception(std::current_exception());
- }
- return nullptr;
- }
+ std::unique_ptr<core::ProcessGroup> getYamlRoot(std::istream &yamlConfigStream);
/**
* Returns a shared pointer to a ProcessGroup object containing the
@@ -122,210 +78,7 @@ class YamlConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override {
- try {
- YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
- return getYamlRoot(rootYamlNode);
- } catch (const YAML::ParserException &pe) {
- logger_->log_error(pe.what());
- std::rethrow_exception(std::current_exception());
- }
- return nullptr;
- }
-
- /**
- * Iterates all component property validation rules and checks that configured state
- * is valid. If state is determined to be invalid, conf parsing ends and an error is raised.
- *
- * @param component
- * @param component_name
- * @param yaml_section
- */
- void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const;
-
- protected:
- /**
- * Returns a shared pointer to a ProcessGroup object containing the
- * flow configuration. The rootYamlNode argument must point to
- * an YAML::Node object containing the root node of the parsed YAML
- * for the flow configuration.
- *
- * @param rootYamlNode a pointer to a YAML::Node object containing the root
- * node of the parsed YAML document
- * @return the root ProcessGroup node of the flow
- * configuration tree
- */
- std::unique_ptr<core::ProcessGroup> getYamlRoot(const YAML::Node& rootYamlNode);
-
- std::unique_ptr<core::ProcessGroup> createProcessGroup(const YAML::Node& yamlNode, bool is_root = false);
-
- std::unique_ptr<core::ProcessGroup> parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root = false);
- /**
- * Parses a processor from its corresponding YAML config node and adds
- * it to a parent ProcessGroup. The processorNode argument must point
- * to a YAML::Node containing the processor configuration. A Processor
- * object will be created a added to the parent ProcessGroup specified
- * by the parent argument.
- *
- * @param processorsNode the YAML::Node containing the processor configuration
- * @param parent the parent ProcessGroup to which the the created
- * Processor should be added
- */
- void parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parent);
-
- /**
- * Parses a port from its corressponding YAML config node and adds
- * it to a parent ProcessGroup. The portNode argument must point
- * to a YAML::Node containing the port configuration. A RemoteProcessorGroupPort
- * object will be created a added to the parent ProcessGroup specified
- * by the parent argument.
- *
- * @param portNode the YAML::Node containing the port configuration
- * @param parent the parent ProcessGroup for the port
- * @param direction the TransferDirection of the port
- */
- void parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction);
-
- /**
- * Parses the root level YAML node for the flow configuration and
- * returns a ProcessGroup containing the tree of flow configuration
- * objects.
- *
- * @param rootFlowNode
- * @return
- */
- std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(const YAML::Node& rootFlowNode);
-
- // Process Property YAML
- void parseProcessorPropertyYaml(const YAML::Node& doc, const YAML::Node& node, std::shared_ptr<core::Processor> processor);
- /**
- * Parse controller services
- * @param controllerServicesNode controller services YAML node.
- * @param parent parent process group.
- */
- void parseControllerServices(const YAML::Node& controllerServicesNode);
-
- /**
- * Parses the Connections section of a configuration YAML.
- * The resulting Connections are added to the parent ProcessGroup.
- *
- * @param node the YAML::Node containing the Connections section
- * of the configuration YAML
- * @param parent the root node of flow configuration to which
- * to add the connections that are parsed
- */
- void parseConnectionYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
- /**
- * Parses the Remote Process Group section of a configuration YAML.
- * The resulting Process Group is added to the parent ProcessGroup.
- *
- * @param node the YAML::Node containing the Remote Process Group
- * section of the configuration YAML
- * @param parent the root node of flow configuration to which
- * to add the process groups that are parsed
- */
- void parseRemoteProcessGroupYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
- /**
- * Parses the Provenance Reporting section of a configuration YAML.
- * The resulting Provenance Reporting processor is added to the
- * parent ProcessGroup.
- *
- * @param reportNode the YAML::Node containing the provenance
- * reporting configuration
- * @param parentGroup the root node of flow configuration to which
- * to add the provenance reporting config
- */
- void parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup);
-
- /**
- * A helper function to parse the Properties Node YAML for a processor.
- *
- * @param propertiesNode the YAML::Node containing the properties
- * @param processor the Processor to which to add the resulting properties
- */
- void parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name, const std::string& yaml_section);
-
- /**
- * Parses the Funnels section of a configuration YAML.
- * The resulting Funnels are added to the parent ProcessGroup.
- *
- * @param node the YAML::Node containing the Funnels section
- * of the configuration YAML
- * @param parent the root node of flow configuration to which
- * to add the funnels that are parsed
- */
- void parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent);
-
- /**
- * Parses the Input/Output Ports section of a configuration YAML.
- * The resulting ports are added to the parent ProcessGroup.
- *
- * @param node the YAML::Node containing the Input/Output Ports section
- * of the configuration YAML
- * @param parent the root node of flow configuration to which
- * to add the funnels that are parsed
- */
- void parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type);
-
- /**
- * A helper function for parsing or generating optional id fields.
- *
- * In parsing YAML flow configurations for config schema v1, the
- * 'id' field of most component types that contains a UUID is optional.
- * This function will check for the existence of the specified
- * idField in the specified yamlNode. If present, the field will be parsed
- * as a UUID and the UUID string will be returned. If not present, a
- * random UUID string will be generated and returned.
- *
- * @param yamlNode a pointer to the YAML::Node that will be checked for the
- * presence of an idField
- * @param idField the string of the name of the idField to check for. This
- * is optional and defaults to 'id'
- * @return the parsed or generated UUID string
- */
- std::string getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField = "id");
- std::string getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section = "", std::string_view error_message = "");
-
- /**
- * This is a helper function for getting an optional value, if it exists.
- * If it does not exist, returns the provided default value.
- *
- * @param yamlNode the YAML node to check
- * @param fieldName the optional field key
- * @param defaultValue the default value to use if field is not set
- * @param yamlSection [optional] the top level section of the YAML config
- * for the yamlNode. This is used fpr generating a
- * useful info message for troubleshooting.
- * @param infoMessage [optional] the info message string to use if
- * the optional field is missing. If not provided,
- * a default info message will be generated.
- */
- YAML::Node getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection = "", const std::string& infoMessage = "");
-
- protected:
- std::shared_ptr<io::StreamFactory> stream_factory_;
-
- private:
- PropertyValue getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode);
- void parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component);
- void parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
- void parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor);
- void addNewId(const std::string& uuid);
-
- std::shared_ptr<logging::Logger> logger_;
- static std::shared_ptr<utils::IdGenerator> id_generator_;
- std::unordered_set<std::string> uuids_;
-
- /**
- * Raises a human-readable configuration error for the given configuration component/section.
- *
- * @param component_name
- * @param yaml_section
- * @param reason
- */
- void raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const;
+ std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) override;
};
} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h
new file mode 100644
index 000000000..ad8422f6e
--- /dev/null
+++ b/libminifi/include/core/yaml/YamlNode.h
@@ -0,0 +1,160 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "yaml-cpp/yaml.h"
+#include "core/flow/Node.h"
+#include "utils/gsl.h"
+
+
+namespace org::apache::nifi::minifi::core {
+
+class YamlNode : public flow::Node::NodeImpl {
+ public:
+ explicit YamlNode(YAML::Node node) : node_(std::move(node)) {}
+
+ explicit operator bool() const override {
+ return node_.operator bool();
+ }
+
+ bool isSequence() const override {
+ return node_.IsSequence();
+ }
+
+ bool isMap() const override {
+ return node_.IsMap();
+ }
+
+ bool isNull() const override {
+ return node_.IsNull();
+ }
+
+ nonstd::expected<std::string, std::exception_ptr> getString() const override {
+ try {
+ return node_.as<std::string>();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<bool, std::exception_ptr> getBool() const override {
+ try {
+ return node_.as<bool>();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<int64_t, std::exception_ptr> getInt64() const override {
+ try {
+ return node_.as<int64_t>();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const override {
+ try {
+ return node_.as<std::string>();
+ } catch (...) {
+ return nonstd::make_unexpected(std::current_exception());
+ }
+ }
+
+ std::string getDebugString() const override {
+ if (!node_) return "<invalid>";
+ if (node_.IsNull()) return "null";
+ if (node_.IsSequence()) return "<Array>";
+ if (node_.IsMap()) return "<Map>";
+ if (node_.IsScalar()) return '"' + node_.Scalar() + '"';
+ return "<unknown>";
+ }
+
+ size_t size() const override {
+ return node_.size();
+ }
+
+ flow::Node::Iterator begin() const override;
+
+ flow::Node::Iterator end() const override;
+
+ flow::Node operator[](std::string_view key) const override {
+ return flow::Node{std::make_shared<YamlNode>(node_[std::string{key}])};
+ }
+
+ std::optional<flow::Node::Cursor> getCursor() const override {
+ YAML::Mark mark = node_.Mark();
+ if (mark.is_null()) {
+ return std::nullopt;
+ }
+ return flow::Node::Cursor{
+ .line = mark.line,
+ .column = mark.column,
+ .pos = mark.pos
+ };
+ }
+
+ private:
+ YAML::Node node_;
+};
+
+class YamlIterator : public flow::Node::Iterator::IteratorImpl {
+ public:
+ explicit YamlIterator(YAML::const_iterator it) : it_(std::move(it)) {}
+
+ IteratorImpl &operator++() override {
+ ++it_;
+ return *this;
+ }
+
+ bool operator==(const IteratorImpl &other) const override {
+ const auto *ptr = dynamic_cast<const YamlIterator *>(&other);
+ gsl_Expects(ptr);
+ return it_ == ptr->it_;
+ }
+
+ flow::Node::Iterator::Value operator*() const override {
+ auto val = *it_;
+ auto node = flow::Node{std::make_shared<YamlNode>(val)};
+ auto first = flow::Node{std::make_shared<YamlNode>(val.first)};
+ auto second = flow::Node{std::make_shared<YamlNode>(val.second)};
+ return flow::Node::Iterator::Value(node, first, second);
+ }
+
+ std::unique_ptr<IteratorImpl> clone() const override {
+ return std::make_unique<YamlIterator>(it_);
+ }
+
+ private:
+ YAML::const_iterator it_;
+};
+
+flow::Node::Iterator YamlNode::begin() const {
+ return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())};
+}
+
+flow::Node::Iterator YamlNode::end() const {
+ return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())};
+}
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/utils/ValueCaster.h b/libminifi/include/utils/ValueCaster.h
index ad9dd5f0c..bc31b85c2 100644
--- a/libminifi/include/utils/ValueCaster.h
+++ b/libminifi/include/utils/ValueCaster.h
@@ -18,12 +18,7 @@
#pragma once
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-namespace internal {
+namespace org::apache::nifi::minifi::utils::internal {
template<class T, class U>
bool cast_if_in_range(T in, U& out) {
@@ -37,9 +32,4 @@ bool cast_if_in_range(T in, U& out) {
return true;
}
-} /* namespace internal */
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::utils::internal
diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp
index ea4c40346..81e2bb1e0 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -29,48 +29,50 @@
#include "io/StreamFactory.h"
#include "core/yaml/YamlConfiguration.h"
+#include "core/json/JsonConfiguration.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
+
+std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const ConfigurationContext& ctx, const std::optional<std::string>& configuration_class_name, bool fail_safe) {
+ std::string class_name_lc;
+ if (configuration_class_name) {
+ class_name_lc = configuration_class_name.value();
+ } else if (ctx.path) {
+ if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) {
+ class_name_lc = "yamlconfiguration";
+ } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) {
+ class_name_lc = "jsonconfiguration";
+ } else {
+ throw std::runtime_error("Could not infer config type from file path");
+ }
+ } else {
+ throw std::runtime_error("Neither configuration class nor config file path has been specified");
+ }
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
- std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
- std::shared_ptr<io::StreamFactory> stream_factory, const std::string& configuration_class_name,
- const std::optional<std::string>& path, std::shared_ptr<utils::file::FileSystem> filesystem,
- bool fail_safe) {
- std::string class_name_lc = configuration_class_name;
std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
try {
if (class_name_lc == "flowconfiguration") {
// load the base configuration.
- return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+ return std::make_unique<core::FlowConfiguration>(ctx);
} else if (class_name_lc == "yamlconfiguration") {
// only load if the class is defined.
- return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(
- repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem));
+ return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx));
+ } else if (class_name_lc == "jsonconfiguration") {
+ return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx));
} else {
if (fail_safe) {
- return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+ return std::make_unique<core::FlowConfiguration>(ctx);
} else {
throw std::runtime_error("Support for the provided configuration class could not be found");
}
}
} catch (const std::runtime_error &) {
if (fail_safe) {
- return std::make_unique<core::FlowConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path, filesystem);
+ return std::make_unique<core::FlowConfiguration>(ctx);
}
}
throw std::runtime_error("Support for the provided configuration class could not be found");
}
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 6c773e2f2..c7981b86f 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -28,17 +28,13 @@
namespace org::apache::nifi::minifi::core {
-FlowConfiguration::FlowConfiguration(
- const std::shared_ptr<core::Repository>& /*repo*/, std::shared_ptr<core::Repository> flow_file_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<io::StreamFactory> stream_factory,
- std::shared_ptr<Configure> configuration, const std::optional<std::filesystem::path>& path,
- std::shared_ptr<utils::file::FileSystem> filesystem)
+FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
: CoreComponent(core::getClassName<FlowConfiguration>()),
- flow_file_repo_(std::move(flow_file_repo)),
- content_repo_(std::move(content_repo)),
- stream_factory_(std::move(stream_factory)),
- configuration_(std::move(configuration)),
- filesystem_(std::move(filesystem)),
+ flow_file_repo_(std::move(ctx.flow_file_repo)),
+ content_repo_(std::move(ctx.content_repo)),
+ stream_factory_(std::move(ctx.stream_factory)),
+ configuration_(std::move(ctx.configuration)),
+ filesystem_(std::move(ctx.filesystem)),
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
@@ -49,13 +45,13 @@ FlowConfiguration::FlowConfiguration(
configuration_->get(Configure::nifi_c2_flow_url, flowUrl);
flow_version_ = std::make_shared<state::response::FlowVersion>(flowUrl, bucket_id, flowId);
- if (!path) {
+ if (!ctx.path) {
logger_->log_error("Configuration path is not specified.");
} else {
- config_path_ = utils::file::canonicalize(*path);
+ config_path_ = utils::file::canonicalize(*ctx.path);
if (!config_path_) {
- logger_->log_error("Couldn't find config file \"%s\".", path->string());
- config_path_ = path;
+ logger_->log_error("Couldn't find config file \"%s\".", ctx.path->string());
+ config_path_ = ctx.path;
}
checksum_calculator_.setFileLocation(*config_path_);
}
diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp
new file mode 100644
index 000000000..6942a5844
--- /dev/null
+++ b/libminifi/src/core/flow/CheckRequiredField.cpp
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+
+#include "core/flow/CheckRequiredField.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+bool isFieldPresent(const Node &node, std::string_view field_name) {
+ return bool{node[field_name]};
+}
+
+std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) {
+ const Node name_node = node["name"];
+ // Build a helpful error message for the user so they can fix the
+ // invalid config file, using the component name if present
+ auto field_list_string = utils::StringUtils::join(", ", alternate_field_names);
+ std::string err_msg =
+ name_node ?
+ "Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" :
+ "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
+ if (!section.empty()) {
+ err_msg += " [in '" + std::string(section) + "' section of configuration file]";
+ }
+ if (auto cursor = node.getCursor()) {
+ err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]";
+ }
+ return err_msg;
+}
+
+void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) {
+ if (!isFieldPresent(node, field_name)) {
+ if (error_message.empty()) {
+ throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section));
+ }
+ throw std::invalid_argument(error_message.data());
+ }
+}
+
+std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) {
+ for (const auto& name : alternate_names) {
+ if (isFieldPresent(node, name)) {
+ return node[name].getString().value();
+ }
+ }
+ if (error_message.empty()) {
+ throw std::invalid_argument(buildErrorMessage(node, alternate_names, section));
+ }
+ throw std::invalid_argument(error_message.data());
+}
+
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/include/Defaults.h b/libminifi/src/core/flow/Node.cpp
similarity index 59%
copy from libminifi/include/Defaults.h
copy to libminifi/src/core/flow/Node.cpp
index 0ee8eaa48..7fd7b0df8 100644
--- a/libminifi/include/Defaults.h
+++ b/libminifi/src/core/flow/Node.cpp
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,10 +16,12 @@
* limitations under the License.
*/
-#pragma once
+#include "core/flow/Node.h"
+
+namespace org::apache::nifi::minifi::core::flow {
+
+Node::Iterator::Value Node::Iterator::operator*() const {
+ return impl_->operator*();
+}
-const std::filesystem::path DEFAULT_NIFI_CONFIG_YML = std::filesystem::path("conf") / "config.yml";
-const std::filesystem::path DEFAULT_NIFI_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi.properties";
-const std::filesystem::path DEFAULT_LOG_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-log.properties";
-const std::filesystem::path DEFAULT_UID_PROPERTIES_FILE = std::filesystem::path("conf") / "minifi-uid.properties";
-const std::filesystem::path DEFAULT_BOOTSTRAP_FILE = std::filesystem::path("conf") / "bootstrap.conf";
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp
similarity index 51%
copy from libminifi/src/core/yaml/YamlConfiguration.cpp
copy to libminifi/src/core/flow/StructuredConfiguration.cpp
index f69f73de4..7e1d41e63 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -21,50 +21,42 @@
#include <set>
#include <cinttypes>
-#include "core/yaml/YamlConfiguration.h"
-#include "core/yaml/CheckRequiredField.h"
-#include "core/yaml/YamlConnectionParser.h"
+#include "core/flow/StructuredConfiguration.h"
+#include "core/flow/CheckRequiredField.h"
+#include "core/flow/StructuredConnectionParser.h"
#include "core/state/Value.h"
#include "Defaults.h"
#include "utils/TimeUtil.h"
+#include "utils/RegexUtils.h"
#include "Funnel.h"
-#ifdef YAML_CONFIGURATION_USE_REGEX
-#include "utils/RegexUtils.h"
-#endif // YAML_CONFIGURATION_USE_REGEX
-
-namespace org::apache::nifi::minifi::core {
-
-std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
- const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
- const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path,
- const std::shared_ptr<utils::file::FileSystem>& filesystem)
- : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration,
- path.value_or(DEFAULT_NIFI_CONFIG_YML), filesystem),
- stream_factory_(stream_factory),
- logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
- auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
- auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
- this->name_ = rootGroup->getName();
- return rootGroup;
+namespace org::apache::nifi::minifi::core::flow {
+
+std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
+
+StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger)
+ : FlowConfiguration(std::move(ctx)),
+ logger_(std::move(logger)) {}
+
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) {
+ auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY];
+ auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true);
+ this->name_ = root_group->getName();
+ return root_group;
}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) {
int version = 0;
- yaml::checkRequiredField(yamlNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto flowName = yamlNode["name"].as<std::string>();
+ checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ auto flowName = node["name"].getString().value();
utils::Identifier uuid;
// assignment throws on invalid uuid
- uuid = getOrGenerateId(yamlNode);
+ uuid = getOrGenerateId(node);
- if (yamlNode["version"]) {
- version = yamlNode["version"].as<int>();
+ if (node["version"]) {
+ version = gsl::narrow<int>(node["version"].getInt64().value());
}
logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
@@ -75,8 +67,8 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const
group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
}
- if (yamlNode["onschedule retry interval"]) {
- auto onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
+ if (node["onschedule retry interval"]) {
+ auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value();
logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
@@ -89,51 +81,49 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const
return group;
}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
- auto group = createProcessGroup(headerNode, is_root);
- YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
- YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
- YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
- YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
- YAML::Node remoteProcessingGroupsNode = [&] {
- // assignment is not supported on invalid Yaml nodes
- YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) {
+ auto group = createProcessGroup(header_node, is_root);
+ Node processorsNode = node[CONFIG_PROCESSORS_KEY];
+ Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY];
+ Node funnelsNode = node[CONFIG_FUNNELS_KEY];
+ Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY];
+ Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY];
+ Node remoteProcessingGroupsNode = [&] {
+ // assignment is not supported on invalid nodes
+ Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY];
if (candidate) {
return candidate;
}
- return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+ return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3];
}();
- YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
+ Node childProcessGroupNodeSeq = node["Process Groups"];
- parseProcessorNodeYaml(processorsNode, group.get());
- parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
- parseFunnelsYaml(funnelsNode, group.get());
+ parseProcessorNode(processorsNode, group.get());
+ parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get());
+ parseFunnels(funnelsNode, group.get());
parsePorts(inputPortsNode, group.get(), PortType::INPUT);
parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
- if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
- for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
- auto childProcessGroupNode = it->as<YAML::Node>();
- group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
+ if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.isSequence()) {
+ for (const auto& childProcessGroupNode : childProcessGroupNodeSeq) {
+ group->addProcessGroup(parseProcessGroup(childProcessGroupNode, childProcessGroupNode));
}
}
-
// parse connections last to give feedback if the source and/or destination processors
// is not in the same process group or input/output port connections are not allowed
- parseConnectionYaml(connectionsNode, group.get());
+ parseConnection(connectionsNode, group.get());
return group;
}
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
+std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) {
uuids_.clear();
- YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
- YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+ Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY];
+ Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY];
parseControllerServices(controllerServiceNode);
// Create the root process group
- std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
- parseProvenanceReportingYaml(provenanceReportNode, root.get());
+ std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroup(root_node);
+ parseProvenanceReporting(provenanceReportNode, root.get());
// set the controller services into the root group.
for (const auto& controller_service : controller_services_->getAllControllerServices()) {
@@ -146,36 +136,35 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::N
return root;
}
-void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) {
int64_t runDurationNanos = -1;
utils::Identifier uuid;
std::unique_ptr<core::Processor> processor;
if (!parentGroup) {
- logger_->log_error("parseProcessNodeYaml: no parent group exists");
+ logger_->log_error("parseProcessNode: no parent group exists");
return;
}
- if (!processorsNode) {
+ if (!processors_node) {
throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
- if (!processorsNode.IsSequence()) {
+ if (!processors_node.isSequence()) {
throw std::invalid_argument(
"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
// Evaluate sequence of processors
- for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
+ for (const auto& procNode : processors_node) {
core::ProcessorConfig procCfg;
- const auto procNode = iter->as<YAML::Node>();
- yaml::checkRequiredField(procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
- procCfg.name = procNode["name"].as<std::string>();
+ checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY);
+ procCfg.name = procNode["name"].getString().value();
procCfg.id = getOrGenerateId(procNode);
uuid = procCfg.id;
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
- yaml::checkRequiredField(procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
- procCfg.javaClass = procNode["class"].as<std::string>();
+ checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY);
+ procCfg.javaClass = procNode["class"].getString().value();
logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
// Determine the processor name only from the Java class
@@ -198,45 +187,40 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
- auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
- CONFIG_YAML_PROCESSORS_KEY);
- procCfg.schedulingStrategy = strategyNode.as<std::string>();
+ procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY);
logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
- auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
- CONFIG_YAML_PROCESSORS_KEY);
+ procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY);
- procCfg.schedulingPeriod = periodNode.as<std::string>();
logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
- if (procNode["max concurrent tasks"]) {
- procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+ if (auto tasksNode = procNode["max concurrent tasks"]) {
+ procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
}
if (procNode["penalization period"]) {
- procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+ procCfg.penalizationPeriod = procNode["penalization period"].getString().value();
logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
}
if (procNode["yield period"]) {
- procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+ procCfg.yieldPeriod = procNode["yield period"].getString().value();
logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
}
- if (procNode["run duration nanos"]) {
- procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
+ if (auto runNode = procNode["run duration nanos"]) {
+ procCfg.runDurationNanos = runNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
}
// handle auto-terminated relationships
if (procNode["auto-terminated relationships list"]) {
- YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+ Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
- auto autoTerminatedRel = relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+ if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
+ for (const auto& autoTerminatedRel : autoTerminatedSequence) {
+ rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value());
}
}
procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
@@ -244,8 +228,8 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
// handle processor properties
if (procNode["Properties"]) {
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(propertiesNode, *processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
+ Node propertiesNode = procNode["Properties"];
+ parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY);
}
// Take care of scheduling
@@ -307,89 +291,85 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
}
}
-void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, core::ProcessGroup* parentGroup) {
utils::Identifier uuid;
std::string id;
if (!parentGroup) {
- logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
+ logger_->log_error("parseRemoteProcessGroup: no parent group exists");
return;
}
- if (!rpgNode || !rpgNode.IsSequence()) {
+ if (!rpg_node_seq || !rpg_node_seq.isSequence()) {
return;
}
- for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
- auto currRpgNode = iter->as<YAML::Node>();
-
- yaml::checkRequiredField(currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto name = currRpgNode["name"].as<std::string>();
+ for (const auto& currRpgNode : rpg_node_seq) {
+ checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ auto name = currRpgNode["name"].getString().value();
id = getOrGenerateId(currRpgNode);
- logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+ logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id);
- auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY);
- auto url = urlNode.as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
+ logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url);
uuid = id;
- auto group = this->createRemoteProcessGroup(name, uuid);
+ auto group = createRemoteProcessGroup(name, uuid);
group->setParent(parentGroup);
if (currRpgNode["yield period"]) {
- auto yieldPeriod = currRpgNode["yield period"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
+ auto yieldPeriod = currRpgNode["yield period"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod);
auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
if (yield_period_value.has_value() && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
+ logger_->log_debug("parseRemoteProcessGroup: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
group->setYieldPeriodMsec(*yield_period_value);
}
}
if (currRpgNode["timeout"]) {
- auto timeout = currRpgNode["timeout"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+ auto timeout = currRpgNode["timeout"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout);
auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
if (timeout_value.has_value() && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
+ logger_->log_debug("parseRemoteProcessGroup: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
group->setTimeout(timeout_value->count());
}
}
if (currRpgNode["local network interface"]) {
- auto interface = currRpgNode["local network interface"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
+ auto interface = currRpgNode["local network interface"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface);
group->setInterface(interface);
}
if (currRpgNode["transport protocol"]) {
- auto transport_protocol = currRpgNode["transport protocol"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
+ auto transport_protocol = currRpgNode["transport protocol"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol);
if (transport_protocol == "HTTP") {
group->setTransportProtocol(transport_protocol);
if (currRpgNode["proxy host"]) {
- auto http_proxy_host = currRpgNode["proxy host"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
+ auto http_proxy_host = currRpgNode["proxy host"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host);
group->setHttpProxyHost(http_proxy_host);
if (currRpgNode["proxy user"]) {
- auto http_proxy_username = currRpgNode["proxy user"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
+ auto http_proxy_username = currRpgNode["proxy user"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username);
group->setHttpProxyUserName(http_proxy_username);
}
if (currRpgNode["proxy password"]) {
- auto http_proxy_password = currRpgNode["proxy password"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
+ auto http_proxy_password = currRpgNode["proxy password"].getString().value();
+ logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password);
group->setHttpProxyPassWord(http_proxy_password);
}
if (currRpgNode["proxy port"]) {
- auto http_proxy_port = currRpgNode["proxy port"].as<std::string>();
+ auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value();
int32_t port;
if (core::Property::StringToInt(http_proxy_port, port)) {
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
+ logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port);
group->setHttpProxyPort(port);
}
}
@@ -406,50 +386,44 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
group->setTransmitting(true);
group->setURL(url);
- yaml::checkRequiredField(currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
- if (inputPorts && inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- auto currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
+ checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ auto inputPorts = currRpgNode["Input Ports"];
+ if (inputPorts && inputPorts.isSequence()) {
+ for (const auto& currPort : inputPorts) {
+ parsePort(currPort, group.get(), sitetosite::SEND);
} // for node
}
- auto outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
- if (outputPorts && outputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
+ auto outputPorts = currRpgNode["Output Ports"];
+ if (outputPorts && outputPorts.isSequence()) {
+ for (const auto& currPort : outputPorts) {
logger_->log_debug("Got a current port, iterating...");
- auto currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
+ parsePort(currPort, group.get(), sitetosite::RECEIVE);
} // for node
}
parentGroup->addProcessGroup(std::move(group));
}
}
-void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
+void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::ProcessGroup* parent_group) {
utils::Identifier port_uuid;
- if (!parentGroup) {
- logger_->log_error("parseProvenanceReportingYaml: no parent group exists");
+ if (!parent_group) {
+ logger_->log_error("parseProvenanceReporting: no parent group exists");
return;
}
- if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
+ if (!node || node.isNull()) {
logger_->log_debug("no provenance reporting task specified");
return;
}
auto reportTask = createProvenanceReportTask();
- const auto node = reportNode.as<YAML::Node>();
-
- yaml::checkRequiredField(node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
- yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
+ checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY);
+ auto schedulingStrategyStr = node["scheduling strategy"].getString().value();
+ checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY);
+ auto schedulingPeriodStr = node["scheduling period"].getString().value();
if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
@@ -465,9 +439,9 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
int64_t lvalue;
if (node["host"] && node["port"]) {
- auto hostStr = node["host"].as<std::string>();
+ auto hostStr = node["host"].getString().value();
- auto portStr = node["port"].as<std::string>();
+ std::string portStr = node["port"].getIntegerAsString().value();
if (core::Property::StringToInt(portStr, lvalue) && !hostStr.empty()) {
logger_->log_debug("ProvenanceReportingTask port %" PRId64, lvalue);
std::string url = hostStr + ":" + portStr;
@@ -476,16 +450,16 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
}
if (node["url"]) {
- auto urlStr = node["url"].as<std::string>();
+ auto urlStr = node["url"].getString().value();
if (!urlStr.empty()) {
reportTask->setURL(urlStr);
logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
}
}
- yaml::checkRequiredField(node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto portUUIDStr = node["port uuid"].as<std::string>();
- yaml::checkRequiredField(node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto batchSizeStr = node["batch size"].as<std::string>();
+ checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY);
+ auto portUUIDStr = node["port uuid"].getString().value();
+ checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY);
+ auto batchSizeStr = node["batch size"].getString().value();
logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
port_uuid = portUUIDStr;
@@ -499,72 +473,72 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
// add processor to parent
reportTask->setScheduledState(core::RUNNING);
- parentGroup->addProcessor(std::move(reportTask));
+ parent_group->addProcessor(std::move(reportTask));
}
-void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
- if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
+void StructuredConfiguration::parseControllerServices(const Node& controller_services_node) {
+ if (!controller_services_node || !controller_services_node.isSequence()) {
return;
}
- for (const auto& iter : controllerServicesNode) {
- const auto controllerServiceNode = iter.as<YAML::Node>();
- try {
- yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ for (const auto& service_node : controller_services_node) {
+ checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY);
- auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- logger_->log_debug("Using type %s for controller service node", type);
+ auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY);
+ logger_->log_debug("Using type %s for controller service node", type);
- std::string fullType = type;
- auto lastOfIdx = type.find_last_of('.');
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- type = type.substr(lastOfIdx);
- }
+ std::string fullType = type;
+ auto lastOfIdx = type.find_last_of('.');
+ if (lastOfIdx != std::string::npos) {
+ lastOfIdx++; // if a value is found, increment to move beyond the .
+ type = type.substr(lastOfIdx);
+ }
- auto name = controllerServiceNode["name"].as<std::string>();
- auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ auto name = service_node["name"].getString().value();
+ auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY);
- utils::Identifier uuid;
- uuid = id;
- std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
- if (nullptr != controller_service_node) {
- logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
- controller_service_node->initialize();
- YAML::Node propertiesNode = controllerServiceNode["Properties"];
+ utils::Identifier uuid;
+ uuid = id;
+ std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
+ if (nullptr != controller_service_node) {
+ logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
+ controller_service_node->initialize();
+ if (Node propertiesNode = service_node["Properties"]) {
// we should propagate properties to the node and to the implementation
- parsePropertiesNodeYaml(propertiesNode, *controller_service_node, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY);
if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
- parsePropertiesNodeYaml(propertiesNode, *controllerServiceImpl, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY);
}
- } else {
- logger_->log_debug("Could not locate %s", type);
}
- controller_services_->put(id, controller_service_node);
- controller_services_->put(name, controller_service_node);
- } catch (YAML::InvalidNode &) {
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
+ } else {
+ logger_->log_debug("Could not locate %s", type);
}
+ controller_services_->put(id, controller_service_node);
+ controller_services_->put(name, controller_service_node);
}
}
-void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
+void StructuredConfiguration::parseConnection(const Node& connection_node_seq, core::ProcessGroup* parent) {
if (!parent) {
logger_->log_error("parseProcessNode: no parent group was provided");
return;
}
- if (!connectionsNode || !connectionsNode.IsSequence()) {
+ if (!connection_node_seq || !connection_node_seq.isSequence()) {
return;
}
- for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
- const auto connectionNode = iter->as<YAML::Node>();
-
+ for (const auto& connection_node : connection_node_seq) {
+ // for backwards compatibility we ignore invalid connection_nodes instead of throwing
+ // previously the ConnectionParser created an unreachable connection in this case
+ if (!connection_node || !connection_node.isMap()) {
+ logger_->log_error("Invalid connection node, ignoring");
+ continue;
+ }
// Configure basic connection
- const std::string id = getOrGenerateId(connectionNode);
+ const std::string id = getOrGenerateId(connection_node);
// Default name to be same as ID
// If name is specified in configuration, use the value
- const auto name = connectionNode["name"].as<std::string>(id);
+ const auto name = connection_node["name"].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect connection UUID format.");
@@ -573,21 +547,21 @@ void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, c
auto connection = createConnection(name, uuid.value());
logger_->log_debug("Created connection with UUID %s and name %s", id, name);
- const yaml::YamlConnectionParser connectionParser(connectionNode, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
- connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection);
- connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
- connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
- connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml());
- connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
- connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
- connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
- connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
+ const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+ connectionParser.configureConnectionSourceRelationships(*connection);
+ connection->setMaxQueueSize(connectionParser.getWorkQueueSize());
+ connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize());
+ connection->setSwapThreshold(connectionParser.getSwapThreshold());
+ connection->setSourceUUID(connectionParser.getSourceUUID());
+ connection->setDestinationUUID(connectionParser.getDestinationUUID());
+ connection->setFlowExpirationDuration(connectionParser.getFlowFileExpiration());
+ connection->setDropEmptyFlowFiles(connectionParser.getDropEmpty());
parent->addConnection(std::move(connection));
}
}
-void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
+void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
utils::Identifier uuid;
if (!parent) {
@@ -595,14 +569,12 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
return;
}
- const auto inputPortsObj = portNode.as<YAML::Node>();
-
// Check for required fields
- yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto nameStr = inputPortsObj["name"].as<std::string>();
- auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
+ checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ auto nameStr = port_node["name"].getString().value();
+ auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY,
"The field 'id' is required for "
- "the port named '" + nameStr + "' in the YAML Config. If this port "
+ "the port named '" + nameStr + "' in the Flow Config. If this port "
"is an input port for a NiFi Remote Process Group, the port "
"id should match the corresponding id specified in the NiFi configuration. "
"This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
@@ -625,17 +597,17 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
// else defaults to RAW
// handle port properties
- const auto nodeVal = portNode.as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["Properties"];
- parsePropertiesNodeYaml(propertiesNode, *port, nameStr, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ if (Node propertiesNode = port_node["Properties"]) {
+ parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY);
+ }
// add processor to parent
auto& processor = *port;
parent->addProcessor(std::move(port));
processor.setScheduledState(core::RUNNING);
- if (inputPortsObj["max concurrent tasks"]) {
- auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+ if (auto tasksNode = port_node["max concurrent tasks"]) {
+ std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value();
int32_t maxConcurrentTasks;
if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
processor.setMaxConcurrentTasks(maxConcurrentTasks);
@@ -645,22 +617,21 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
}
}
-void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component) {
- for (const auto& iter : propertyValueNode) {
- if (iter.IsDefined()) {
- const auto nodeVal = iter.as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["value"];
+void StructuredConfiguration::parsePropertyValueSequence(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& component) {
+ for (const auto& nodeVal : property_value_node) {
+ if (nodeVal) {
+ Node propertiesNode = nodeVal["value"];
// must insert the sequence in differently.
- const auto rawValueString = propertiesNode.as<std::string>();
- logger_->log_debug("Found %s=%s", propertyName, rawValueString);
- if (!component.updateProperty(propertyName, rawValueString)) {
+ const auto rawValueString = propertiesNode.getString().value();
+ logger_->log_debug("Found %s=%s", property_name, rawValueString);
+ if (!component.updateProperty(property_name, rawValueString)) {
auto proc = dynamic_cast<core::Connectable*>(&component);
if (proc) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
- if (!component.setDynamicProperty(propertyName, rawValueString)) {
- logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
+ if (!component.setDynamicProperty(property_name, rawValueString)) {
+ logger_->log_warn("Unable to set the dynamic property %s with value %s", property_name, rawValueString);
} else {
- logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
+ logger_->log_warn("Dynamic property %s with value %s set", property_name, rawValueString);
}
}
}
@@ -668,118 +639,114 @@ void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyNa
}
}
-PropertyValue YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode) {
+PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& property_from_processor, const Node& property_value_node) {
+ using state::response::Value;
PropertyValue defaultValue;
- defaultValue = propertyFromProcessor.getDefaultValue();
+ defaultValue = property_from_processor.getDefaultValue();
const std::type_index defaultType = defaultValue.getTypeInfo();
try {
PropertyValue coercedValue = defaultValue;
- if (defaultType == typeid(int64_t)) {
- coercedValue = propertyValueNode.as<int64_t>();
- } else if (defaultType == typeid(uint64_t)) {
- uint64_t integer_value;
- if (YAML::convert<uint64_t>::decode(propertyValueNode, integer_value)) {
- coercedValue = integer_value;
- } else {
- coercedValue = propertyValueNode.as<std::string>();
- }
- } else if (defaultType == typeid(int)) {
- coercedValue = propertyValueNode.as<int>();
- } else if (defaultType == typeid(bool)) {
- coercedValue = propertyValueNode.as<bool>();
+ auto int64_val = property_value_node.getInt64();
+ if (defaultType == Value::INT64_TYPE && int64_val) {
+ coercedValue = gsl::narrow<int64_t>(int64_val.value());
+ } else if (defaultType == Value::UINT64_TYPE && int64_val) {
+ coercedValue = gsl::narrow<uint64_t>(int64_val.value());
+ } else if (defaultType == Value::UINT32_TYPE && int64_val) {
+ coercedValue = gsl::narrow<uint32_t>(int64_val.value());
+ } else if (defaultType == Value::INT_TYPE && int64_val) {
+ coercedValue = gsl::narrow<int>(int64_val.value());
+ } else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) {
+ coercedValue = property_value_node.getBool().value();
} else {
- coercedValue = propertyValueNode.as<std::string>();
+ coercedValue = property_value_node.getString().value();
}
return coercedValue;
} catch (const std::exception& e) {
logger_->log_error("Fetching property failed with an exception of %s", e.what());
- logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+ logger_->log_error("Invalid conversion for field %s. Value %s", property_from_processor.getName(), property_value_node.getDebugString());
} catch (...) {
- logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+ logger_->log_error("Invalid conversion for field %s. Value %s", property_from_processor.getName(), property_value_node.getDebugString());
}
return defaultValue;
}
-void YamlConfiguration::parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
- core::Property myProp(propertyName, "", "");
- processor.getProperty(propertyName, myProp);
- const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode);
+void StructuredConfiguration::parseSingleProperty(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor) {
+ core::Property myProp(property_name, "", "");
+ processor.getProperty(property_name, myProp);
+ const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, property_value_node);
bool property_set = false;
try {
property_set = processor.setProperty(myProp, coercedValue);
} catch(const utils::internal::InvalidValueException&) {
auto component = dynamic_cast<core::CoreComponent*>(&processor);
if (component == nullptr) {
- logger_->log_error("processor was not a CoreComponent for property '%s'", propertyName);
+ logger_->log_error("processor was not a CoreComponent for property '%s'", property_name);
} else {
- logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName());
+ logger_->log_error("Invalid value was set for property '%s' creating component '%s'", property_name, component->getName());
}
throw;
}
- const auto rawValueString = propertyValueNode.as<std::string>();
if (!property_set) {
+ const auto rawValueString = property_value_node.getString().value();
auto proc = dynamic_cast<core::Connectable*>(&processor);
if (proc) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
- if (!processor.setDynamicProperty(propertyName, rawValueString)) {
- logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
+ logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName());
+ if (!processor.setDynamicProperty(property_name, rawValueString)) {
+ logger_->log_warn("Unable to set the dynamic property %s with value %s", property_name, rawValueString);
} else {
- logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
+ logger_->log_warn("Dynamic property %s with value %s set", property_name, rawValueString);
}
}
} else {
- logger_->log_debug("Property %s with value %s set", propertyName, rawValueString);
+ logger_->log_debug("Property %s with value %s set", property_name, coercedValue.to_string());
}
}
-void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
- logger_->log_trace("Encountered %s", propertyName);
- if (propertyValueNode.IsNull() || !propertyValueNode.IsDefined()) {
+void StructuredConfiguration::parsePropertyNodeElement(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& processor) {
+ logger_->log_trace("Encountered %s", property_name);
+ if (!property_value_node || property_value_node.isNull()) {
return;
}
- if (propertyValueNode.IsSequence()) {
- parsePropertyValueSequence(propertyName, propertyValueNode, processor);
+ if (property_value_node.isSequence()) {
+ parsePropertyValueSequence(property_name, property_value_node, processor);
} else {
- parseSingleProperty(propertyName, propertyValueNode, processor);
+ parseSingleProperty(property_name, property_value_node, processor);
}
}
-void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name,
- const std::string& yaml_section) {
- // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
+void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) {
+ // Treat generically as a node so we can perform inspection on entries to ensure they are populated
logger_->log_trace("Entered %s", component_name);
- for (const auto& propertyElem : propertiesNode) {
- const auto propertyName = propertyElem.first.as<std::string>();
- const YAML::Node propertyValueNode = propertyElem.second;
+ for (const auto& property_node : properties_node) {
+ const auto propertyName = property_node.first.getString().value();
+ const Node propertyValueNode = property_node.second;
parsePropertyNodeElement(propertyName, propertyValueNode, component);
}
- validateComponentProperties(component, component_name, yaml_section);
+ validateComponentProperties(component, component_name, section);
}
-void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent) {
+void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) {
if (!parent) {
- logger_->log_error("parseFunnelsYaml: no parent group was provided");
+ logger_->log_error("parseFunnels: no parent group was provided");
return;
}
- if (!node || !node.IsSequence()) {
+ if (!node || !node.isSequence()) {
return;
}
- for (const auto& element : node) {
- const auto funnel_node = element.as<YAML::Node>();
-
+ for (const auto& funnel_node : node) {
std::string id = getOrGenerateId(funnel_node);
// Default name to be same as ID
- const auto name = funnel_node["name"].as<std::string>(id);
+ const auto name = funnel_node["name"].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect funnel UUID format.");
throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
});
- auto funnel = std::make_unique<Funnel>(name, uuid.value());
+ auto funnel = std::make_unique<minifi::Funnel>(name, uuid.value());
logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
funnel->setScheduledState(core::RUNNING);
funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
@@ -787,22 +754,20 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGr
}
}
-void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGroup* parent, PortType port_type) {
if (!parent) {
logger_->log_error("parsePorts: no parent group was provided");
return;
}
- if (!node || !node.IsSequence()) {
+ if (!node || !node.isSequence()) {
return;
}
- for (const auto& element : node) {
- const auto port_node = element.as<YAML::Node>();
-
+ for (const auto& port_node : node) {
std::string id = getOrGenerateId(port_node);
// Default name to be same as ID
- const auto name = port_node["name"].as<std::string>(id);
+ const auto name = port_node["name"].getString().value_or(id);
const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
logger_->log_debug("Incorrect port UUID format.");
@@ -817,7 +782,8 @@ void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* p
}
}
-void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
+
+void StructuredConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string §ion) const {
const auto &component_properties = component.getProperties();
// Validate required properties
@@ -825,10 +791,10 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
if (prop_pair.second.getRequired()) {
if (prop_pair.second.getValue().to_string().empty()) {
std::string reason = utils::StringUtils::join_pack("required property '", prop_pair.second.getName(), "' is not set");
- raiseComponentError(component_name, yaml_section, reason);
+ raiseComponentError(component_name, section, reason);
} else if (!prop_pair.second.getValue().validate(prop_pair.first).valid()) {
std::string reason = utils::StringUtils::join_pack("the value '", prop_pair.first, "' is not valid for property '", prop_pair.second.getName(), "'");
- raiseComponentError(component_name, yaml_section, reason);
+ raiseComponentError(component_name, section, reason);
}
}
}
@@ -845,12 +811,11 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
if (component_properties.at(dep_prop_key).getValue().to_string().empty()) {
std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
"' depends on property '", dep_prop_key, "' which is not set");
- raiseComponentError(component_name, yaml_section, reason);
+ raiseComponentError(component_name, section, reason);
}
}
}
-#ifdef YAML_CONFIGURATION_USE_REGEX
// Validate mutually-exclusive properties
for (const auto &prop_pair : component_properties) {
const auto &excl_props = prop_pair.second.getExclusiveOfProperties();
@@ -864,7 +829,7 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
if (utils::regexMatch(component_properties.at(excl_pair.first).getValue().to_string(), excl_expr)) {
std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
"' must not be set when the value of property '", excl_pair.first, "' matches '", excl_pair.second, "'");
- raiseComponentError(component_name, yaml_section, reason);
+ raiseComponentError(component_name, section, reason);
}
}
}
@@ -877,22 +842,18 @@ void YamlConfiguration::validateComponentProperties(ConfigurableComponent& compo
utils::Regex prop_regex(prop_regex_str);
if (!utils::regexMatch(prop_pair.second.getValue().to_string(), prop_regex)) {
std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), "' does not match validation pattern '", prop_regex_str, "'");
- raiseComponentError(component_name, yaml_section, reason);
+ raiseComponentError(component_name, section, reason);
}
}
}
-#else
- logging::LOG_INFO(logger_) << "Validation of mutally-exclusive properties is disabled in this build.";
- logging::LOG_INFO(logger_) << "Regex validation of properties is not available in this build.";
-#endif // YAML_CONFIGURATION_USE_REGEX
}
-void YamlConfiguration::raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const {
+void StructuredConfiguration::raiseComponentError(const std::string &component_name, const std::string §ion, const std::string &reason) const {
std::string err_msg = "Unable to parse configuration file for component named '";
err_msg.append(component_name);
err_msg.append("' because " + reason);
- if (!yaml_section.empty()) {
- err_msg.append(" [in '" + yaml_section + "' section of configuration file]");
+ if (!section.empty()) {
+ err_msg.append(" [in '" + section + "' section of configuration file]");
}
logging::LOG_ERROR(logger_) << err_msg;
@@ -900,60 +861,57 @@ void YamlConfiguration::raiseComponentError(const std::string &component_name, c
throw std::invalid_argument(err_msg);
}
-std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
- std::string id;
- auto node = yamlNode.as<YAML::Node>();
-
- if (node[idField]) {
- if (YAML::NodeType::Scalar == node[idField].Type()) {
- id = node[idField].as<std::string>();
+std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) {
+ if (node[id_field]) {
+ if (auto opt_id_str = node[id_field].getString()) {
+ auto id = opt_id_str.value();
addNewId(id);
return id;
}
- throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar.");
+ throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string.");
}
- id = id_generator_->generate().to_string();
+ auto id = id_generator_->generate().to_string();
logger_->log_debug("Generating random ID: id => [%s]", id);
return id;
}
-std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) {
- yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message);
- auto id = yaml_node["id"].as<std::string>();
+std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) {
+ checkRequiredField(node, "id", section, error_message);
+ auto id = node["id"].getString().value();
addNewId(id);
return id;
}
-YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
- const std::string& providedInfoMessage) {
- std::string infoMessage = providedInfoMessage;
- auto result = yamlNode.as<YAML::Node>()[fieldName];
+std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section,
+ const std::string& info_message) {
+ std::string infoMessage = info_message;
+ auto result = node[field_name];
if (!result) {
if (infoMessage.empty()) {
// Build a helpful info message for the user to inform them that a default is being used
infoMessage =
- yamlNode.as<YAML::Node>()["name"] ?
- "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
- "Using default value for optional field '" + fieldName + "' ";
- if (!yamlSection.empty()) {
- infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
+ node["name"] ?
+ "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" :
+ "Using default value for optional field '" + field_name + "' ";
+ if (!section.empty()) {
+ infoMessage += " [in '" + section + "' section of configuration file]: ";
}
- infoMessage += defaultValue.as<std::string>();
+ infoMessage += default_value;
}
logging::LOG_INFO(logger_) << infoMessage;
- result = defaultValue;
+ return default_value;
}
- return result;
+ return result.getString().value();
}
-void YamlConfiguration::addNewId(const std::string& uuid) {
+void StructuredConfiguration::addNewId(const std::string& uuid) {
const auto [_, success] = uuids_.insert(uuid);
if (!success) {
throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration");
}
}
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp
similarity index 73%
rename from libminifi/src/core/yaml/YamlConnectionParser.cpp
rename to libminifi/src/core/flow/StructuredConnectionParser.cpp
index 9435be361..f39a3ac8b 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp
@@ -16,22 +16,22 @@
* limitations under the License.
*/
-#include "core/yaml/YamlConnectionParser.h"
-#include "core/yaml/CheckRequiredField.h"
+#include "core/flow/StructuredConnectionParser.h"
+#include "core/flow/CheckRequiredField.h"
#include "Funnel.h"
-namespace org::apache::nifi::minifi::core::yaml {
+namespace org::apache::nifi::minifi::core::flow {
-void YamlConnectionParser::addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const {
+void StructuredConnectionParser::addNewRelationshipToConnection(const std::string& relationship_name, minifi::Connection& connection) const {
core::Relationship relationship(relationship_name, "");
logger_->log_debug("parseConnection: relationship => [%s]", relationship_name);
connection.addRelationship(relationship);
}
-void YamlConnectionParser::addFunnelRelationshipToConnection(minifi::Connection& connection) const {
+void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Connection& connection) const {
utils::Identifier srcUUID;
try {
- srcUUID = getSourceUUIDFromYaml();
+ srcUUID = getSourceUUID();
} catch(const std::exception&) {
return;
}
@@ -47,18 +47,18 @@ void YamlConnectionParser::addFunnelRelationshipToConnection(minifi::Connection&
}
}
-void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(minifi::Connection& connection) const {
+void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const {
// Configure connection source
- if (connectionNode_.as<YAML::Node>()["source relationship name"] && !connectionNode_["source relationship name"].as<std::string>().empty()) {
- addNewRelationshipToConnection(connectionNode_["source relationship name"].as<std::string>(), connection);
- } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+ if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) {
+ addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection);
+ } else if (connectionNode_["source relationship names"]) {
auto relList = connectionNode_["source relationship names"];
- if (relList.IsSequence() && relList.begin() != relList.end()) {
+ if (relList.isSequence() && !relList.empty()) {
for (const auto &rel : relList) {
- addNewRelationshipToConnection(rel.as<std::string>(), connection);
+ addNewRelationshipToConnection(rel.getString().value(), connection);
}
- } else if (!relList.IsSequence() && !relList.as<std::string>().empty()) {
- addNewRelationshipToConnection(relList.as<std::string>(), connection);
+ } else if (!relList.isSequence() && !relList.getString().value().empty()) {
+ addNewRelationshipToConnection(relList.getString().value(), connection);
} else {
addFunnelRelationshipToConnection(connection);
}
@@ -67,10 +67,9 @@ void YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(minifi
}
}
-uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
- const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue size"];
- if (max_work_queue_data_size_node) {
- auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+uint64_t StructuredConnectionParser::getWorkQueueSize() const {
+ if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) {
+ std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
uint64_t max_work_queue_size;
if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
logger_->log_debug("Setting %" PRIu64 " as the max queue size.", max_work_queue_size);
@@ -81,10 +80,10 @@ uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
return 0;
}
-uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
- const YAML::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
+uint64_t StructuredConnectionParser::getWorkQueueDataSize() const {
+ const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"];
if (max_work_queue_data_size_node) {
- auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+ std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value();
uint64_t max_work_queue_data_size = 0;
if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) {
logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data size.", max_work_queue_data_size);
@@ -95,10 +94,10 @@ uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
return 0;
}
-uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const {
- const YAML::Node swap_threshold_node = connectionNode_["swap threshold"];
+uint64_t StructuredConnectionParser::getSwapThreshold() const {
+ const flow::Node swap_threshold_node = connectionNode_["swap threshold"];
if (swap_threshold_node) {
- auto swap_threshold_str = swap_threshold_node.as<std::string>();
+ auto swap_threshold_str = swap_threshold_node.getString().value();
uint64_t swap_threshold;
if (core::Property::StringToInt(swap_threshold_str, swap_threshold)) {
logger_->log_debug("Setting %" PRIu64 " as the swap threshold.", swap_threshold);
@@ -109,20 +108,20 @@ uint64_t YamlConnectionParser::getSwapThresholdFromYaml() const {
return 0;
}
-utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
- const YAML::Node source_id_node = connectionNode_["source id"];
+utils::Identifier StructuredConnectionParser::getSourceUUID() const {
+ const flow::Node source_id_node = connectionNode_["source id"];
if (source_id_node) {
- const auto srcUUID = utils::Identifier::parse(source_id_node.as<std::string>());
+ const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value());
if (srcUUID) {
logger_->log_debug("Using 'source id' to match source with same id for connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
return srcUUID.value();
}
- logger_->log_error("Invalid source id value: %s.", source_id_node.as<std::string>());
+ logger_->log_error("Invalid source id value: %s.", source_id_node.getString().value());
throw std::invalid_argument("Invalid source id");
}
// if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
- checkRequiredField(connectionNode_, "source name", CONFIG_YAML_CONNECTIONS_KEY);
- const auto connectionSrcProcName = connectionNode_["source name"].as<std::string>();
+ checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY);
+ const auto connectionSrcProcName = connectionNode_["source name"].getString().value();
const auto srcUUID = utils::Identifier::parse(connectionSrcProcName);
if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the source name is a remote port id, so use that as the source id
@@ -141,21 +140,21 @@ utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
throw std::invalid_argument(error_msg);
}
-utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
- const YAML::Node destination_id_node = connectionNode_["destination id"];
+utils::Identifier StructuredConnectionParser::getDestinationUUID() const {
+ const flow::Node destination_id_node = connectionNode_["destination id"];
if (destination_id_node) {
- const auto destUUID = utils::Identifier::parse(destination_id_node.as<std::string>());
+ const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value());
if (destUUID) {
logger_->log_debug("Using 'destination id' to match destination with same id for connection '%s': destination id => [%s]", name_, destUUID.value().to_string());
return destUUID.value();
}
- logger_->log_error("Invalid destination id value: %s.", destination_id_node.as<std::string>());
+ logger_->log_error("Invalid destination id value: %s.", destination_id_node.getString().value());
throw std::invalid_argument("Invalid destination id");
}
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
- checkRequiredField(connectionNode_, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
- auto connectionDestProcName = connectionNode_["destination name"].as<std::string>();
+ checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY);
+ auto connectionDestProcName = connectionNode_["destination name"].getString().value();
const auto destUUID = utils::Identifier::parse(connectionDestProcName);
if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) {
// the destination name is a remote port id, so use that as the dest id
@@ -174,14 +173,14 @@ utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
throw std::invalid_argument(error_msg);
}
-std::chrono::milliseconds YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const {
using namespace std::literals::chrono_literals;
- const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+ const flow::Node expiration_node = connectionNode_["flowfile expiration"];
if (!expiration_node) {
logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
return 0ms;
}
- auto expiration_duration = utils::timeutils::StringToDuration<std::chrono::milliseconds>(expiration_node.as<std::string>());
+ auto expiration_duration = utils::timeutils::StringToDuration<std::chrono::milliseconds>(expiration_node.getString().value());
if (!expiration_duration.has_value()) {
// We should throw here, but we do not.
// The reason is that our parser only accepts time formats that consists of a number and
@@ -196,12 +195,12 @@ std::chrono::milliseconds YamlConnectionParser::getFlowFileExpirationFromYaml()
return *expiration_duration;
}
-bool YamlConnectionParser::getDropEmptyFromYaml() const {
- const YAML::Node drop_empty_node = connectionNode_["drop empty"];
+bool StructuredConnectionParser::getDropEmpty() const {
+ const flow::Node drop_empty_node = connectionNode_["drop empty"];
if (drop_empty_node) {
- return utils::StringUtils::toBool(drop_empty_node.as<std::string>()).value_or(false);
+ return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false);
}
return false;
}
-} // namespace org::apache::nifi::minifi::core::yaml
+} // namespace org::apache::nifi::minifi::core::flow
diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp
new file mode 100644
index 000000000..960f9a631
--- /dev/null
+++ b/libminifi/src/core/json/JsonConfiguration.cpp
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <vector>
+#include <set>
+#include <cinttypes>
+#include <variant>
+
+#include "core/json/JsonConfiguration.h"
+#include "core/json/JsonNode.h"
+#include "core/state/Value.h"
+#include "Defaults.h"
+#include "utils/TimeUtil.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+
+namespace org::apache::nifi::minifi::core {
+
+namespace {
+
+} // namespace
+
+
+JsonConfiguration::JsonConfiguration(ConfigurationContext ctx)
+ : StructuredConfiguration(([&] {
+ if (!ctx.path) {
+ ctx.path = DEFAULT_NIFI_CONFIG_JSON;
+ }
+ return std::move(ctx);
+ })(),
+ logging::LoggerFactory<JsonConfiguration>::getLogger()) {}
+
+std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() {
+ if (!config_path_) {
+ logger_->log_error("Cannot instantiate flow, no config file is set.");
+ throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
+ }
+ const auto configuration = filesystem_->read(config_path_.value());
+ if (!configuration) {
+ // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+ return nullptr;
+ }
+ try {
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length());
+ if (!res) {
+ throw std::runtime_error("Could not parse json file");
+ }
+ flow::Node root{std::make_shared<JsonNode>(&doc)};
+ return getRootFrom(root);
+ } catch(...) {
+ logger_->log_error("Invalid json configuration file");
+ throw;
+ }
+}
+
+std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) {
+ try {
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length());
+ if (!res) {
+ throw std::runtime_error("Could not parse json file");
+ }
+ flow::Node root{std::make_shared<JsonNode>(&doc)};
+ return getRootFrom(root);
+ } catch (const std::runtime_error& err) {
+ logger_->log_error(err.what());
+ throw;
+ }
+}
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/yaml/CheckRequiredField.cpp b/libminifi/src/core/yaml/CheckRequiredField.cpp
deleted file mode 100644
index bb5b7abf8..000000000
--- a/libminifi/src/core/yaml/CheckRequiredField.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdexcept>
-
-#include "core/yaml/CheckRequiredField.h"
-#include "utils/StringUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace yaml {
-
-bool isFieldPresent(const YAML::Node &yaml_node, std::string_view field_name) {
- return bool{yaml_node.as<YAML::Node>()[field_name.data()]};
-}
-
-std::string buildErrorMessage(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_field_names, std::string_view yaml_section) {
- const YAML::Node name_node = yaml_node.as<YAML::Node>()["name"];
- // Build a helpful error message for the user so they can fix the
- // invalid YAML config file, using the component name if present
- auto field_list_string = utils::StringUtils::join(", ", alternate_field_names);
- std::string err_msg =
- name_node ?
- "Unable to parse configuration file for component named '" + name_node.as<std::string>() + "' as none of the possible required fields [" + field_list_string + "] is available" :
- "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available";
- if (!yaml_section.empty()) {
- err_msg += " [in '" + std::string(yaml_section) + "' section of configuration file]";
- }
- const YAML::Mark mark = yaml_node.Mark();
- if (!mark.is_null()) {
- err_msg += " [line:column, pos at " + std::to_string(mark.line) + ":" + std::to_string(mark.column) + ", " + std::to_string(mark.pos) + "]";
- }
- return err_msg;
-}
-
-void checkRequiredField(const YAML::Node &yaml_node, std::string_view field_name, std::string_view yaml_section, std::string_view error_message) {
- if (!isFieldPresent(yaml_node, field_name)) {
- if (error_message.empty()) {
- throw std::invalid_argument(buildErrorMessage(yaml_node, std::vector<std::string>{std::string(field_name)}, yaml_section));
- }
- throw std::invalid_argument(error_message.data());
- }
-}
-
-std::string getRequiredField(const YAML::Node &yaml_node, const std::vector<std::string> &alternate_names, std::string_view yaml_section, std::string_view error_message) {
- for (const auto& name : alternate_names) {
- if (yaml::isFieldPresent(yaml_node, name)) {
- return yaml_node[name].as<std::string>();
- }
- }
- if (error_message.empty()) {
- throw std::invalid_argument(buildErrorMessage(yaml_node, alternate_names, yaml_section));
- }
- throw std::invalid_argument(error_message.data());
-}
-
-} // namespace yaml
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index f69f73de4..89092eee5 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -22,937 +22,63 @@
#include <cinttypes>
#include "core/yaml/YamlConfiguration.h"
-#include "core/yaml/CheckRequiredField.h"
-#include "core/yaml/YamlConnectionParser.h"
#include "core/state/Value.h"
#include "Defaults.h"
#include "utils/TimeUtil.h"
-#include "Funnel.h"
-
-#ifdef YAML_CONFIGURATION_USE_REGEX
+#include "yaml-cpp/yaml.h"
+#include "core/yaml/YamlNode.h"
#include "utils/RegexUtils.h"
-#endif // YAML_CONFIGURATION_USE_REGEX
namespace org::apache::nifi::minifi::core {
-std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator();
-
-YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo,
- const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory,
- const std::shared_ptr<Configure>& configuration, const std::optional<std::filesystem::path>& path,
- const std::shared_ptr<utils::file::FileSystem>& filesystem)
- : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration,
- path.value_or(DEFAULT_NIFI_CONFIG_YML), filesystem),
- stream_factory_(stream_factory),
- logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) {
- auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY];
- auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true);
- this->name_ = rootGroup->getName();
- return rootGroup;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) {
- int version = 0;
-
- yaml::checkRequiredField(yamlNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto flowName = yamlNode["name"].as<std::string>();
-
- utils::Identifier uuid;
- // assignment throws on invalid uuid
- uuid = getOrGenerateId(yamlNode);
-
- if (yamlNode["version"]) {
- version = yamlNode["version"].as<int>();
- }
-
- logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName);
- std::unique_ptr<core::ProcessGroup> group;
- if (is_root) {
- group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version);
- } else {
- group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version);
- }
-
- if (yamlNode["onschedule retry interval"]) {
- auto onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
- logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
-
- auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
- if (on_schedule_retry_period_value.has_value() && group) {
- logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count());
- group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count());
- }
- }
-
- return group;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) {
- auto group = createProcessGroup(headerNode, is_root);
- YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
- YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
- YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
- YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
- YAML::Node remoteProcessingGroupsNode = [&] {
- // assignment is not supported on invalid Yaml nodes
- YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
- if (candidate) {
- return candidate;
- }
- return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
- }();
- YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"];
-
- parseProcessorNodeYaml(processorsNode, group.get());
- parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
- parseFunnelsYaml(funnelsNode, group.get());
- parsePorts(inputPortsNode, group.get(), PortType::INPUT);
- parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
-
- if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
- for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
- auto childProcessGroupNode = it->as<YAML::Node>();
- group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
- }
- }
-
- // parse connections last to give feedback if the source and/or destination processors
- // is not in the same process group or input/output port connections are not allowed
- parseConnectionYaml(connectionsNode, group.get());
- return group;
-}
-
-std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) {
- uuids_.clear();
- YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
- YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY];
-
- parseControllerServices(controllerServiceNode);
- // Create the root process group
- std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode);
- parseProvenanceReportingYaml(provenanceReportNode, root.get());
-
- // set the controller services into the root group.
- for (const auto& controller_service : controller_services_->getAllControllerServices()) {
- root->addControllerService(controller_service->getName(), controller_service);
- root->addControllerService(controller_service->getUUIDStr(), controller_service);
- }
-
- root->verify();
-
- return root;
-}
-
-void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
- int64_t runDurationNanos = -1;
- utils::Identifier uuid;
- std::unique_ptr<core::Processor> processor;
-
- if (!parentGroup) {
- logger_->log_error("parseProcessNodeYaml: no parent group exists");
- return;
- }
-
- if (!processorsNode) {
- throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
- }
- if (!processorsNode.IsSequence()) {
- throw std::invalid_argument(
- "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
- }
- // Evaluate sequence of processors
- for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
- core::ProcessorConfig procCfg;
- const auto procNode = iter->as<YAML::Node>();
-
- yaml::checkRequiredField(procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
- procCfg.name = procNode["name"].as<std::string>();
- procCfg.id = getOrGenerateId(procNode);
-
- uuid = procCfg.id;
- logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
- yaml::checkRequiredField(procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
- procCfg.javaClass = procNode["class"].as<std::string>();
- logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
-
- // Determine the processor name only from the Java class
- auto lastOfIdx = procCfg.javaClass.find_last_of('.');
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- std::string processorName = procCfg.javaClass.substr(lastOfIdx);
- processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
- } else {
- // Allow unqualified class names for core processors
- processor = this->createProcessor(procCfg.javaClass, uuid);
- }
-
- if (!processor) {
- logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
- throw std::invalid_argument("Could not create processor " + procCfg.name);
- }
-
- processor->setName(procCfg.name);
-
- processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
- auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
- CONFIG_YAML_PROCESSORS_KEY);
- procCfg.schedulingStrategy = strategyNode.as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
-
- auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
- CONFIG_YAML_PROCESSORS_KEY);
-
- procCfg.schedulingPeriod = periodNode.as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
-
- if (procNode["max concurrent tasks"]) {
- procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
- logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
- }
-
- if (procNode["penalization period"]) {
- procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
- }
-
- if (procNode["yield period"]) {
- procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
- }
-
- if (procNode["run duration nanos"]) {
- procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
- logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
- }
-
- // handle auto-terminated relationships
- if (procNode["auto-terminated relationships list"]) {
- YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
- std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) {
- auto autoTerminatedRel = relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
- }
- }
- procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
- }
-
- // handle processor properties
- if (procNode["Properties"]) {
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(propertiesNode, *processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY);
- }
-
- // Take care of scheduling
-
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count());
- processor->setSchedulingPeriodNano(*scheduling_period);
- }
- } else {
- processor->setCronPeriod(procCfg.schedulingPeriod);
- }
-
- if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count());
- processor->setPenalizationPeriod(penalization_period.value());
- }
-
- if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count());
- processor->setYieldPeriodMsec(yield_period.value());
- }
-
- // Default to running
- processor->setScheduledState(core::RUNNING);
-
- if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
- processor->setSchedulingStrategy(core::TIMER_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- processor->setSchedulingStrategy(core::EVENT_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- } else {
- processor->setSchedulingStrategy(core::CRON_DRIVEN);
- logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
- }
-
- int32_t maxConcurrentTasks;
- if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
- }
-
- if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
- logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
- processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos));
- }
-
- std::vector<core::Relationship> autoTerminatedRelationships;
- for (auto &&relString : procCfg.autoTerminatedRelationships) {
- core::Relationship relationship(relString, "");
- logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString);
- autoTerminatedRelationships.push_back(relationship);
- }
-
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- parentGroup->addProcessor(std::move(processor));
- }
-}
-
-void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) {
- utils::Identifier uuid;
- std::string id;
-
- if (!parentGroup) {
- logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists");
- return;
- }
-
- if (!rpgNode || !rpgNode.IsSequence()) {
- return;
- }
- for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) {
- auto currRpgNode = iter->as<YAML::Node>();
-
- yaml::checkRequiredField(currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto name = currRpgNode["name"].as<std::string>();
- id = getOrGenerateId(currRpgNode);
-
- logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
-
- auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""),
- CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-
- auto url = urlNode.as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
-
- uuid = id;
- auto group = this->createRemoteProcessGroup(name, uuid);
- group->setParent(parentGroup);
-
- if (currRpgNode["yield period"]) {
- auto yieldPeriod = currRpgNode["yield period"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
-
- auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
- if (yield_period_value.has_value() && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
- group->setYieldPeriodMsec(*yield_period_value);
- }
- }
-
- if (currRpgNode["timeout"]) {
- auto timeout = currRpgNode["timeout"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
-
- auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
- if (timeout_value.has_value() && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
- group->setTimeout(timeout_value->count());
- }
- }
-
- if (currRpgNode["local network interface"]) {
- auto interface = currRpgNode["local network interface"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface);
- group->setInterface(interface);
- }
-
- if (currRpgNode["transport protocol"]) {
- auto transport_protocol = currRpgNode["transport protocol"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol);
- if (transport_protocol == "HTTP") {
- group->setTransportProtocol(transport_protocol);
- if (currRpgNode["proxy host"]) {
- auto http_proxy_host = currRpgNode["proxy host"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host);
- group->setHttpProxyHost(http_proxy_host);
- if (currRpgNode["proxy user"]) {
- auto http_proxy_username = currRpgNode["proxy user"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username);
- group->setHttpProxyUserName(http_proxy_username);
- }
- if (currRpgNode["proxy password"]) {
- auto http_proxy_password = currRpgNode["proxy password"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password);
- group->setHttpProxyPassWord(http_proxy_password);
+YamlConfiguration::YamlConfiguration(ConfigurationContext ctx)
+ : StructuredConfiguration(([&] {
+ if (!ctx.path) {
+ ctx.path = DEFAULT_NIFI_CONFIG_YML;
}
- if (currRpgNode["proxy port"]) {
- auto http_proxy_port = currRpgNode["proxy port"].as<std::string>();
- int32_t port;
- if (core::Property::StringToInt(http_proxy_port, port)) {
- logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port);
- group->setHttpProxyPort(port);
- }
- }
- }
- } else if (transport_protocol == "RAW") {
- group->setTransportProtocol(transport_protocol);
- } else {
- std::stringstream stream;
- stream << "Invalid transport protocol " << transport_protocol;
- throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str());
- }
- }
-
- group->setTransmitting(true);
- group->setURL(url);
-
- yaml::checkRequiredField(currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
- if (inputPorts && inputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- auto currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(currPort, group.get(), sitetosite::SEND);
- } // for node
- }
- auto outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
- if (outputPorts && outputPorts.IsSequence()) {
- for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) {
- logger_->log_debug("Got a current port, iterating...");
-
- auto currPort = portIter->as<YAML::Node>();
-
- this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE);
- } // for node
- }
- parentGroup->addProcessGroup(std::move(group));
- }
-}
-
-void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
- utils::Identifier port_uuid;
-
- if (!parentGroup) {
- logger_->log_error("parseProvenanceReportingYaml: no parent group exists");
- return;
- }
-
- if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) {
- logger_->log_debug("no provenance reporting task specified");
- return;
- }
-
- auto reportTask = createProvenanceReportTask();
-
- const auto node = reportNode.as<YAML::Node>();
-
- yaml::checkRequiredField(node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
- yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
-
- if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
- logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
- reportTask->setSchedulingPeriodNano(*scheduling_period);
- }
-
- if (schedulingStrategyStr == "TIMER_DRIVEN") {
- reportTask->setSchedulingStrategy(core::TIMER_DRIVEN);
- logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr);
- } else {
- throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr);
- }
-
- int64_t lvalue;
- if (node["host"] && node["port"]) {
- auto hostStr = node["host"].as<std::string>();
-
- auto portStr = node["port"].as<std::string>();
- if (core::Property::StringToInt(portStr, lvalue) && !hostStr.empty()) {
- logger_->log_debug("ProvenanceReportingTask port %" PRId64, lvalue);
- std::string url = hostStr + ":" + portStr;
- reportTask->setURL(url);
- }
- }
-
- if (node["url"]) {
- auto urlStr = node["url"].as<std::string>();
- if (!urlStr.empty()) {
- reportTask->setURL(urlStr);
- logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
- }
- }
- yaml::checkRequiredField(node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto portUUIDStr = node["port uuid"].as<std::string>();
- yaml::checkRequiredField(node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
- auto batchSizeStr = node["batch size"].as<std::string>();
-
- logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
- port_uuid = portUUIDStr;
- reportTask->setPortUUID(port_uuid);
-
- if (core::Property::StringToInt(batchSizeStr, lvalue)) {
- reportTask->setBatchSize(gsl::narrow<int>(lvalue));
- }
-
- reportTask->initialize();
-
- // add processor to parent
- reportTask->setScheduledState(core::RUNNING);
- parentGroup->addProcessor(std::move(reportTask));
-}
-
-void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) {
- if (!controllerServicesNode || !controllerServicesNode.IsSequence()) {
- return;
- }
- for (const auto& iter : controllerServicesNode) {
- const auto controllerServiceNode = iter.as<YAML::Node>();
- try {
- yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-
- auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- logger_->log_debug("Using type %s for controller service node", type);
-
- std::string fullType = type;
- auto lastOfIdx = type.find_last_of('.');
- if (lastOfIdx != std::string::npos) {
- lastOfIdx++; // if a value is found, increment to move beyond the .
- type = type.substr(lastOfIdx);
- }
-
- auto name = controllerServiceNode["name"].as<std::string>();
- auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-
- utils::Identifier uuid;
- uuid = id;
- std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid);
- if (nullptr != controller_service_node) {
- logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name);
- controller_service_node->initialize();
- YAML::Node propertiesNode = controllerServiceNode["Properties"];
- // we should propagate properties to the node and to the implementation
- parsePropertiesNodeYaml(propertiesNode, *controller_service_node, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) {
- parsePropertiesNodeYaml(propertiesNode, *controllerServiceImpl, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- }
- } else {
- logger_->log_debug("Could not locate %s", type);
- }
- controller_services_->put(id, controller_service_node);
- controller_services_->put(name, controller_service_node);
- } catch (YAML::InvalidNode &) {
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services");
- }
- }
-}
-
-void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) {
- if (!parent) {
- logger_->log_error("parseProcessNode: no parent group was provided");
- return;
- }
- if (!connectionsNode || !connectionsNode.IsSequence()) {
- return;
- }
-
- for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) {
- const auto connectionNode = iter->as<YAML::Node>();
-
- // Configure basic connection
- const std::string id = getOrGenerateId(connectionNode);
-
- // Default name to be same as ID
- // If name is specified in configuration, use the value
- const auto name = connectionNode["name"].as<std::string>(id);
-
- const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
- logger_->log_debug("Incorrect connection UUID format.");
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect connection UUID format.");
- });
-
- auto connection = createConnection(name, uuid.value());
- logger_->log_debug("Created connection with UUID %s and name %s", id, name);
- const yaml::YamlConnectionParser connectionParser(connectionNode, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
- connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection);
- connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
- connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
- connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml());
- connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
- connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
- connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
- connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
-
- parent->addConnection(std::move(connection));
- }
-}
+ return std::move(ctx);
+ })(),
+ logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) {
- utils::Identifier uuid;
-
- if (!parent) {
- logger_->log_error("parseProcessNode: no parent group existed");
- return;
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() {
+ if (!config_path_) {
+ logger_->log_error("Cannot instantiate flow, no config file is set.");
+ throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified");
}
-
- const auto inputPortsObj = portNode.as<YAML::Node>();
-
- // Check for required fields
- yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
- auto nameStr = inputPortsObj["name"].as<std::string>();
- auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
- "The field 'id' is required for "
- "the port named '" + nameStr + "' in the YAML Config. If this port "
- "is an input port for a NiFi Remote Process Group, the port "
- "id should match the corresponding id specified in the NiFi configuration. "
- "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
- uuid = portId;
-
- auto port = std::make_unique<minifi::RemoteProcessorGroupPort>(
- stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid);
- port->setDirection(direction);
- port->setTimeout(parent->getTimeout());
- port->setTransmitting(true);
- port->setYieldPeriodMsec(parent->getYieldPeriodMsec());
- port->initialize();
- if (!parent->getInterface().empty())
- port->setInterface(parent->getInterface());
- if (parent->getTransportProtocol() == "HTTP") {
- port->enableHTTP();
- if (!parent->getHttpProxyHost().empty())
- port->setHTTPProxy(parent->getHTTPProxy());
+ const auto configuration = filesystem_->read(config_path_.value());
+ if (!configuration) {
+ // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network
+ return nullptr;
}
- // else defaults to RAW
-
- // handle port properties
- const auto nodeVal = portNode.as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["Properties"];
- parsePropertiesNodeYaml(propertiesNode, *port, nameStr, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-
- // add processor to parent
- auto& processor = *port;
- parent->addProcessor(std::move(port));
- processor.setScheduledState(core::RUNNING);
-
- if (inputPortsObj["max concurrent tasks"]) {
- auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
- int32_t maxConcurrentTasks;
- if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
- processor.setMaxConcurrentTasks(maxConcurrentTasks);
- }
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
- processor.setMaxConcurrentTasks(maxConcurrentTasks);
- }
-}
-
-void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component) {
- for (const auto& iter : propertyValueNode) {
- if (iter.IsDefined()) {
- const auto nodeVal = iter.as<YAML::Node>();
- YAML::Node propertiesNode = nodeVal["value"];
- // must insert the sequence in differently.
- const auto rawValueString = propertiesNode.as<std::string>();
- logger_->log_debug("Found %s=%s", propertyName, rawValueString);
- if (!component.updateProperty(propertyName, rawValueString)) {
- auto proc = dynamic_cast<core::Connectable*>(&component);
- if (proc) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
- if (!component.setDynamicProperty(propertyName, rawValueString)) {
- logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
- } else {
- logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
- }
- }
- }
- }
- }
-}
-
-PropertyValue YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode) {
- PropertyValue defaultValue;
- defaultValue = propertyFromProcessor.getDefaultValue();
- const std::type_index defaultType = defaultValue.getTypeInfo();
try {
- PropertyValue coercedValue = defaultValue;
- if (defaultType == typeid(int64_t)) {
- coercedValue = propertyValueNode.as<int64_t>();
- } else if (defaultType == typeid(uint64_t)) {
- uint64_t integer_value;
- if (YAML::convert<uint64_t>::decode(propertyValueNode, integer_value)) {
- coercedValue = integer_value;
- } else {
- coercedValue = propertyValueNode.as<std::string>();
- }
- } else if (defaultType == typeid(int)) {
- coercedValue = propertyValueNode.as<int>();
- } else if (defaultType == typeid(bool)) {
- coercedValue = propertyValueNode.as<bool>();
- } else {
- coercedValue = propertyValueNode.as<std::string>();
- }
- return coercedValue;
- } catch (const std::exception& e) {
- logger_->log_error("Fetching property failed with an exception of %s", e.what());
- logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
- } catch (...) {
- logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>());
+ YAML::Node rootYamlNode = YAML::Load(configuration.value());
+ flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+ return getRootFrom(root);
+ } catch(...) {
+ logger_->log_error("Invalid yaml configuration file");
+ throw;
}
- return defaultValue;
}
-void YamlConfiguration::parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
- core::Property myProp(propertyName, "", "");
- processor.getProperty(propertyName, myProp);
- const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode);
- bool property_set = false;
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) {
try {
- property_set = processor.setProperty(myProp, coercedValue);
- } catch(const utils::internal::InvalidValueException&) {
- auto component = dynamic_cast<core::CoreComponent*>(&processor);
- if (component == nullptr) {
- logger_->log_error("processor was not a CoreComponent for property '%s'", propertyName);
- } else {
- logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName());
- }
+ YAML::Node rootYamlNode = YAML::Load(yamlConfigStream);
+ flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+ return getRootFrom(root);
+ } catch (const YAML::ParserException &pe) {
+ logger_->log_error(pe.what());
throw;
}
- const auto rawValueString = propertyValueNode.as<std::string>();
- if (!property_set) {
- auto proc = dynamic_cast<core::Connectable*>(&processor);
- if (proc) {
- logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName());
- if (!processor.setDynamicProperty(propertyName, rawValueString)) {
- logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString);
- } else {
- logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString);
- }
- }
- } else {
- logger_->log_debug("Property %s with value %s set", propertyName, rawValueString);
- }
-}
-
-void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) {
- logger_->log_trace("Encountered %s", propertyName);
- if (propertyValueNode.IsNull() || !propertyValueNode.IsDefined()) {
- return;
- }
- if (propertyValueNode.IsSequence()) {
- parsePropertyValueSequence(propertyName, propertyValueNode, processor);
- } else {
- parseSingleProperty(propertyName, propertyValueNode, processor);
- }
-}
-
-void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name,
- const std::string& yaml_section) {
- // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated
- logger_->log_trace("Entered %s", component_name);
- for (const auto& propertyElem : propertiesNode) {
- const auto propertyName = propertyElem.first.as<std::string>();
- const YAML::Node propertyValueNode = propertyElem.second;
- parsePropertyNodeElement(propertyName, propertyValueNode, component);
- }
-
- validateComponentProperties(component, component_name, yaml_section);
-}
-
-void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent) {
- if (!parent) {
- logger_->log_error("parseFunnelsYaml: no parent group was provided");
- return;
- }
- if (!node || !node.IsSequence()) {
- return;
- }
-
- for (const auto& element : node) {
- const auto funnel_node = element.as<YAML::Node>();
-
- std::string id = getOrGenerateId(funnel_node);
-
- // Default name to be same as ID
- const auto name = funnel_node["name"].as<std::string>(id);
-
- const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
- logger_->log_debug("Incorrect funnel UUID format.");
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
- });
-
- auto funnel = std::make_unique<Funnel>(name, uuid.value());
- logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
- funnel->setScheduledState(core::RUNNING);
- funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
- parent->addProcessor(std::move(funnel));
- }
-}
-
-void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
- if (!parent) {
- logger_->log_error("parsePorts: no parent group was provided");
- return;
- }
- if (!node || !node.IsSequence()) {
- return;
- }
-
- for (const auto& element : node) {
- const auto port_node = element.as<YAML::Node>();
-
- std::string id = getOrGenerateId(port_node);
-
- // Default name to be same as ID
- const auto name = port_node["name"].as<std::string>(id);
-
- const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
- logger_->log_debug("Incorrect port UUID format.");
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format.");
- });
-
- auto port = std::make_unique<Port>(name, uuid.value(), port_type);
- logger_->log_debug("Created port UUID %s and name %s", id, name);
- port->setScheduledState(core::RUNNING);
- port->setSchedulingStrategy(core::EVENT_DRIVEN);
- parent->addPort(std::move(port));
- }
-}
-
-void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
- const auto &component_properties = component.getProperties();
-
- // Validate required properties
- for (const auto &prop_pair : component_properties) {
- if (prop_pair.second.getRequired()) {
- if (prop_pair.second.getValue().to_string().empty()) {
- std::string reason = utils::StringUtils::join_pack("required property '", prop_pair.second.getName(), "' is not set");
- raiseComponentError(component_name, yaml_section, reason);
- } else if (!prop_pair.second.getValue().validate(prop_pair.first).valid()) {
- std::string reason = utils::StringUtils::join_pack("the value '", prop_pair.first, "' is not valid for property '", prop_pair.second.getName(), "'");
- raiseComponentError(component_name, yaml_section, reason);
- }
- }
- }
-
- // Validate dependent properties
- for (const auto &prop_pair : component_properties) {
- const auto &dep_props = prop_pair.second.getDependentProperties();
-
- if (prop_pair.second.getValue().to_string().empty()) {
- continue;
- }
-
- for (const auto &dep_prop_key : dep_props) {
- if (component_properties.at(dep_prop_key).getValue().to_string().empty()) {
- std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
- "' depends on property '", dep_prop_key, "' which is not set");
- raiseComponentError(component_name, yaml_section, reason);
- }
- }
- }
-
-#ifdef YAML_CONFIGURATION_USE_REGEX
- // Validate mutually-exclusive properties
- for (const auto &prop_pair : component_properties) {
- const auto &excl_props = prop_pair.second.getExclusiveOfProperties();
-
- if (prop_pair.second.getValue().empty()) {
- continue;
- }
-
- for (const auto &excl_pair : excl_props) {
- utils::Regex excl_expr(excl_pair.second);
- if (utils::regexMatch(component_properties.at(excl_pair.first).getValue().to_string(), excl_expr)) {
- std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(),
- "' must not be set when the value of property '", excl_pair.first, "' matches '", excl_pair.second, "'");
- raiseComponentError(component_name, yaml_section, reason);
- }
- }
- }
-
- // Validate regex properties
- for (const auto &prop_pair : component_properties) {
- const auto &prop_regex_str = prop_pair.second.getValidRegex();
-
- if (!prop_regex_str.empty()) {
- utils::Regex prop_regex(prop_regex_str);
- if (!utils::regexMatch(prop_pair.second.getValue().to_string(), prop_regex)) {
- std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), "' does not match validation pattern '", prop_regex_str, "'");
- raiseComponentError(component_name, yaml_section, reason);
- }
- }
- }
-#else
- logging::LOG_INFO(logger_) << "Validation of mutally-exclusive properties is disabled in this build.";
- logging::LOG_INFO(logger_) << "Regex validation of properties is not available in this build.";
-#endif // YAML_CONFIGURATION_USE_REGEX
-}
-
-void YamlConfiguration::raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const {
- std::string err_msg = "Unable to parse configuration file for component named '";
- err_msg.append(component_name);
- err_msg.append("' because " + reason);
- if (!yaml_section.empty()) {
- err_msg.append(" [in '" + yaml_section + "' section of configuration file]");
- }
-
- logging::LOG_ERROR(logger_) << err_msg;
-
- throw std::invalid_argument(err_msg);
-}
-
-std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) {
- std::string id;
- auto node = yamlNode.as<YAML::Node>();
-
- if (node[idField]) {
- if (YAML::NodeType::Scalar == node[idField].Type()) {
- id = node[idField].as<std::string>();
- addNewId(id);
- return id;
- }
- throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar.");
- }
-
- id = id_generator_->generate().to_string();
- logger_->log_debug("Generating random ID: id => [%s]", id);
- return id;
}
-std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) {
- yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message);
- auto id = yaml_node["id"].as<std::string>();
- addNewId(id);
- return id;
-}
-
-YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection,
- const std::string& providedInfoMessage) {
- std::string infoMessage = providedInfoMessage;
- auto result = yamlNode.as<YAML::Node>()[fieldName];
- if (!result) {
- if (infoMessage.empty()) {
- // Build a helpful info message for the user to inform them that a default is being used
- infoMessage =
- yamlNode.as<YAML::Node>()["name"] ?
- "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" :
- "Using default value for optional field '" + fieldName + "' ";
- if (!yamlSection.empty()) {
- infoMessage += " [in '" + yamlSection + "' section of configuration file]: ";
- }
-
- infoMessage += defaultValue.as<std::string>();
- }
- logging::LOG_INFO(logger_) << infoMessage;
- result = defaultValue;
- }
-
- return result;
-}
-
-void YamlConfiguration::addNewId(const std::string& uuid) {
- const auto [_, success] = uuids_.insert(uuid);
- if (!success) {
- throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration");
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const std::string &yamlConfigPayload) {
+ try {
+ YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
+ flow::Node root{std::make_shared<YamlNode>(rootYamlNode)};
+ return getRootFrom(root);
+ } catch (const YAML::ParserException &pe) {
+ logger_->log_error(pe.what());
+ throw;
}
}
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b72a188a9..684d71988 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -63,7 +63,7 @@ class TestControllerWithFlow: public TestController{
REQUIRE(content_repo->initialize(configuration_));
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration_);
- auto flow = std::make_unique<core::YamlConfiguration>(prov_repo, ff_repo, content_repo, stream_factory, configuration_, yaml_path_.string());
+ auto flow = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{prov_repo, ff_repo, content_repo, stream_factory, configuration_, yaml_path_.string()});
auto root = flow->getRoot();
root_ = root.get();
controller_ = std::make_shared<minifi::FlowController>(
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 0e6abe3c4..90b20e627 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -174,7 +174,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_
filesystem = std::make_shared<utils::file::FileSystem>();
}
- auto flow_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem);
+ auto flow_config = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location, filesystem});
auto controller_service_provider = flow_config->getControllerServiceProvider();
char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 902d81671..5344722ff 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -57,13 +57,13 @@ int main(int argc, char **argv) {
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::make_unique<core::YamlConfiguration>(
- test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
const auto controller = std::make_shared<minifi::FlowController>(
test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
std::make_shared<utils::file::FileSystem>(), []{});
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config({test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location});
std::shared_ptr<core::ProcessGroup> pg = yaml_config.getRoot();
std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>());
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
index 20831f7c2..e3797479c 100644
--- a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
@@ -95,7 +95,7 @@ class PersistableKeyValueStoreServiceTestsFixture {
content_repo->initialize(configuration);
stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- yaml_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml);
+ yaml_config = std::make_unique<core::YamlConfiguration>(core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml});
process_group = yaml_config->getRoot();
persistable_key_value_store_service_node = process_group->findControllerService("testcontroller");
diff --git a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
index c0853f2d1..5058ebebc 100644
--- a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
@@ -88,7 +88,8 @@ class UnorderedMapKeyValueStoreServiceTestFixture {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::unique_ptr<core::YamlConfiguration> yaml_config = std::make_unique<core::YamlConfiguration>(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml);
+ std::unique_ptr<core::YamlConfiguration> yaml_config = std::make_unique<core::YamlConfiguration>(
+ core::ConfigurationContext{test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml});
std::unique_ptr<core::ProcessGroup> process_group;
std::shared_ptr<core::controller::ControllerServiceNode> key_value_store_service_node;
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index d9bc6dc69..0dcbb4c40 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -184,7 +184,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
ff_repository->initialize(config);
content_repo->initialize(config);
- auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+ auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
auto flowController = std::make_shared<minifi::FlowController>(
prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
@@ -304,7 +304,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
ff_repository->initialize(config);
content_repo->initialize(config);
- auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+ auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
auto flowController = std::make_shared<minifi::FlowController>(
prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 94ec4ddfb..520c0629d 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -297,7 +297,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
auto inputPtr = input.get();
root->addConnection(std::move(input));
- auto flowConfig = std::make_unique<core::FlowConfiguration>(prov_repo, ff_repository, content_repo, nullptr, config, "");
+ auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{prov_repo, ff_repository, content_repo, nullptr, config, ""});
auto flowController = std::make_shared<minifi::FlowController>(
prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", std::make_shared<utils::file::FileSystem>(), []{});
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index c15449677..a9ef41bab 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -356,8 +356,14 @@ int main(int argc, char **argv) {
utils::crypto::EncryptionProvider::create(minifiHome));
std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(
- prov_repo, flow_repo, content_repo, configure, stream_factory, nifi_configuration_class_name,
- configure->get(minifi::Configure::nifi_flow_configuration_file), filesystem);
+ core::ConfigurationContext{
+ .repo = prov_repo,
+ .flow_file_repo = flow_repo,
+ .content_repo = content_repo,
+ .stream_factory = stream_factory,
+ .configuration = configure,
+ .path = configure->get(minifi::Configure::nifi_flow_configuration_file),
+ .filesystem = filesystem}, nifi_configuration_class_name);
const auto controller = std::make_unique<minifi::FlowController>(
prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, filesystem, request_restart);