You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/08/28 17:01:28 UTC
[2/2] nifi-minifi-cpp git commit: MINIFICPP-601: Add scheduling
information
MINIFICPP-601: Add scheduling information
This closes #392.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2ecb997c
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2ecb997c
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2ecb997c
Branch: refs/heads/master
Commit: 2ecb997c72a5b6d80fb73d1eaf67b1e30cbf5c31
Parents: 7ac309b
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Aug 21 20:22:57 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Aug 28 12:59:54 2018 -0400
----------------------------------------------------------------------
libminifi/include/core/Processor.h | 9 +-
libminifi/include/core/ProcessorConfig.h | 8 ++
.../include/core/state/nodes/AgentInformation.h | 58 ++++++++
libminifi/src/core/Processor.cpp | 5 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 6 +-
libminifi/test/unit/ManifestTests.cpp | 143 +++++++++++++++++++
.../unit/PropertyValidationAgentInfoTests.cpp | 120 ----------------
7 files changed, 218 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 18fe2a0..6426f4c 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -53,9 +53,6 @@ namespace core {
// Minimum scheduling period in Nano Second
#define MINIMUM_SCHEDULING_NANOS 30000
-// Default yield period in second
-#define DEFAULT_YIELD_PERIOD_SECONDS 1
-
// Default penalization period in second
#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
@@ -107,11 +104,11 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub
}
// Set Processor Run Duration in Nano Second
void setRunDurationNano(uint64_t period) {
- run_durantion_nano_ = period;
+ run_duration_nano_ = period;
}
// Get Processor Run Duration in Nano Second
uint64_t getRunDurationNano(void) {
- return (run_durantion_nano_);
+ return (run_duration_nano_);
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(uint64_t period) {
@@ -258,7 +255,7 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub
// SchedulePeriod in Nano Seconds
std::atomic<uint64_t> scheduling_period_nano_;
// Run Duration in Nano Seconds
- std::atomic<uint64_t> run_durantion_nano_;
+ std::atomic<uint64_t> run_duration_nano_;
// Yield Period in Milliseconds
std::atomic<uint64_t> yield_period_msec_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/ProcessorConfig.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index c1d563e..aa8e18c 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -26,6 +26,14 @@ namespace nifi {
namespace minifi {
namespace core {
+
+#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
+#define DEFAULT_SCHEDULING_PERIOD "1 sec"
+#define DEFAULT_RUN_DURATION 0
+#define DEFAULT_MAX_CONCURRENT_TASKS 1
+// Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
struct ProcessorConfig {
std::string id;
std::string name;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/state/nodes/AgentInformation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index baafbbe..48a451c 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -48,6 +48,7 @@
#include "agent/build_description.h"
#include "core/ClassLoader.h"
#include "../nodes/StateMonitor.h"
+#include "core/ProcessorConfig.h"
namespace org {
namespace apache {
@@ -284,6 +285,57 @@ class Bundles : public DeviceInformation {
};
+class SchedulingDefaults : public DeviceInformation {
+ public:
+ SchedulingDefaults(std::string name, uuid_t uuid)
+ : DeviceInformation(name, uuid) {
+ }
+
+ SchedulingDefaults(const std::string &name)
+ : DeviceInformation(name, 0) {
+ }
+
+ std::string getName() const {
+ return "schedulingDefaults";
+ }
+
+ std::vector<SerializedResponseNode> serialize() {
+ std::vector<SerializedResponseNode> serialized;
+
+ SerializedResponseNode schedulingDefaults;
+ schedulingDefaults.name = "schedulingDefaults";
+
+ SerializedResponseNode defaultSchedulingStrategy;
+ defaultSchedulingStrategy.name = "defaultSchedulingStrategy";
+ defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY;
+
+ schedulingDefaults.children.push_back(defaultSchedulingStrategy);
+
+ SerializedResponseNode defaultSchedulingPeriod;
+ defaultSchedulingPeriod.name = "defaultSchedulingPeriod";
+ defaultSchedulingPeriod.value = DEFAULT_SCHEDULING_PERIOD;
+
+ schedulingDefaults.children.push_back(defaultSchedulingPeriod);
+
+ SerializedResponseNode defaultRunDuration;
+ defaultRunDuration.name = "defaultRunDurationNanos";
+ defaultRunDuration.value = DEFAULT_RUN_DURATION;
+
+ schedulingDefaults.children.push_back(defaultRunDuration);
+
+ SerializedResponseNode defaultMaxConcurrentTasks;
+ defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks";
+ defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS;
+
+ schedulingDefaults.children.push_back(defaultMaxConcurrentTasks);
+
+ serialized.push_back(schedulingDefaults);
+
+ return serialized;
+ }
+
+};
+
/**
* Justification and Purpose: Provides available extensions for the agent information block.
*/
@@ -491,6 +543,12 @@ class AgentManifest : public DeviceInformation {
serialized.push_back(bundle);
}
+ SchedulingDefaults defaults("schedulingDefaults", nullptr);
+
+ for (auto defaultNode : defaults.serialize()) {
+ serialized.push_back(defaultNode);
+ }
+
return serialized;
}
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7adf718..d49bd55 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -31,6 +31,7 @@
#include <functional>
#include <utility>
#include "Connection.h"
+#include "core/ProcessorConfig.h"
#include "core/Connectable.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -55,10 +56,10 @@ Processor::Processor(std::string name, uuid_t uuid)
loss_tolerant_ = false;
_triggerWhenEmpty = false;
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
- run_durantion_nano_ = 0;
+ run_duration_nano_ = DEFAULT_RUN_DURATION;
yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
- max_concurrent_tasks_ = 1;
+ max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 902bd89..c850afc 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -125,14 +125,14 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
auto strategyNode = getOptionalField(&procNode,
"scheduling strategy",
- YAML::Node("EVENT_DRIVEN"),
+ YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingStrategy = strategyNode.as<std::string>();
logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
auto periodNode = getOptionalField(&procNode,
"scheduling period",
- YAML::Node("1 sec"),
+ YAML::Node(DEFAULT_SCHEDULING_PERIOD),
CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingPeriod = periodNode.as<std::string>();
logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
@@ -153,7 +153,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
}
if (procNode["run duration nanos"]) {
- procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+ procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/ManifestTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ManifestTests.cpp b/libminifi/test/unit/ManifestTests.cpp
new file mode 100644
index 0000000..09e278a
--- /dev/null
+++ b/libminifi/test/unit/ManifestTests.cpp
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "../../include/core/Processor.h"
+#include "../../include/core/state/nodes/AgentInformation.h"
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "core/ClassLoader.h"
+
+TEST_CASE("Test Required", "[required]") {
+ minifi::state::response::ComponentManifest manifest("minifi-system");
+ auto serialized = manifest.serialize();
+ REQUIRE(serialized.size() > 0);
+ const auto &resp = serialized[0];
+ REQUIRE(resp.children.size() > 0);
+ const auto &processors = resp.children[0];
+ REQUIRE(processors.children.size() > 0);
+ const auto &proc_0 = processors.children[0];
+ REQUIRE(proc_0.children.size() > 0);
+ const auto &prop_descriptors = proc_0.children[0];
+ REQUIRE(prop_descriptors.children.size() > 0);
+ const auto &prop_0 = prop_descriptors.children[0];
+ REQUIRE(prop_0.children.size() >= 3);
+ const auto &prop_0_required = prop_0.children[2];
+ REQUIRE("required" == prop_0_required.name);
+ REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue());
+}
+
+TEST_CASE("Test Valid Regex", "[validRegex]") {
+ minifi::state::response::ComponentManifest manifest("minifi-system");
+ auto serialized = manifest.serialize();
+ REQUIRE(serialized.size() > 0);
+ const auto &resp = serialized[0];
+ REQUIRE(resp.children.size() > 0);
+ const auto &processors = resp.children[0];
+ REQUIRE(processors.children.size() > 0);
+ const auto &proc_0 = processors.children[0];
+ REQUIRE(proc_0.children.size() > 0);
+ const auto &prop_descriptors = proc_0.children[0];
+ REQUIRE(prop_descriptors.children.size() > 0);
+ const auto &prop_0 = prop_descriptors.children[0];
+ REQUIRE(prop_0.children.size() >= 3);
+ const auto &df = prop_0.children[3];
+ REQUIRE("defaultValue" == df.name);
+ const auto &prop_0_valid_regex = prop_0.children[4];
+ REQUIRE("validRegex" == prop_0_valid_regex.name);
+}
+
+TEST_CASE("Test Relationships", "[rel1]") {
+ minifi::state::response::ComponentManifest manifest("minifi-system");
+ auto serialized = manifest.serialize();
+ REQUIRE(serialized.size() > 0);
+ const auto &resp = serialized[0];
+ REQUIRE(resp.children.size() > 0);
+ const auto &processors = resp.children[0];
+ REQUIRE(processors.children.size() > 0);
+ minifi::state::response::SerializedResponseNode proc_0;
+ for (const auto &node : processors.children) {
+ if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
+ proc_0 = node;
+ }
+ }
+ REQUIRE(proc_0.children.size() > 0);
+ const auto &relationships = proc_0.children[1];
+ REQUIRE("supportedRelationships" == relationships.name);
+ // this is because they are now nested
+ REQUIRE("supportedRelationships" == relationships.children[0].name);
+ REQUIRE("name" == relationships.children[0].children[0].name);
+ REQUIRE("failure" == relationships.children[0].children[0].value.to_string());
+ REQUIRE("description" == relationships.children[0].children[1].name);
+
+ REQUIRE("success" == relationships.children[1].children[0].value.to_string());
+ REQUIRE("description" == relationships.children[1].children[1].name);
+}
+
+TEST_CASE("Test Dependent", "[dependent]") {
+ minifi::state::response::ComponentManifest manifest("minifi-system");
+ auto serialized = manifest.serialize();
+ REQUIRE(serialized.size() > 0);
+ const auto &resp = serialized[0];
+ REQUIRE(resp.children.size() > 0);
+ const auto &processors = resp.children[0];
+ REQUIRE(processors.children.size() > 0);
+ minifi::state::response::SerializedResponseNode proc_0;
+ for (const auto &node : processors.children) {
+ if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
+ proc_0 = node;
+ }
+ }
+ REQUIRE(proc_0.children.size() > 0);
+ const auto &prop_descriptors = proc_0.children[0];
+ REQUIRE(prop_descriptors.children.size() > 0);
+ const auto &prop_0 = prop_descriptors.children[1];
+ REQUIRE(prop_0.children.size() >= 3);
+ REQUIRE("defaultValue" == prop_0.children[3].name);
+ REQUIRE("validRegex" == prop_0.children[4].name);
+ const auto &prop_0_dependent_0 = prop_descriptors.children[2];
+ REQUIRE("Directory" == prop_0_dependent_0.name);
+}
+
+TEST_CASE("Test Scheduling Defaults", "[schedDef]") {
+ minifi::state::response::AgentManifest manifest("minifi-system");
+ auto serialized = manifest.serialize();
+ REQUIRE(serialized.size() > 0);
+ minifi::state::response::SerializedResponseNode proc_0;
+ for (const auto &node : serialized) {
+ if ("schedulingDefaults" == node.name) {
+ proc_0 = node;
+ }
+ }
+ REQUIRE(proc_0.children.size() == 4);
+ for (const auto &child : proc_0.children) {
+ if ("defaultMaxConcurrentTasks" == child.name) {
+ REQUIRE("1" == child.value.to_string());
+ } else if ("defaultRunDurationNanos" == child.name) {
+ REQUIRE("0" == child.value.to_string());
+ } else if ("defaultSchedulingPeriod" == child.name) {
+ REQUIRE("1 sec" == child.value.to_string());
+ } else if ("defaultSchedulingStrategy" == child.name) {
+ REQUIRE("TIMER_DRIVEN" == child.value.to_string());
+ } else {
+ FAIL("UNKNOWQN NODE");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp b/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
deleted file mode 100644
index 31f9b7b..0000000
--- a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <memory>
-
-#include "../../include/core/Processor.h"
-#include "../../include/core/state/nodes/AgentInformation.h"
-#include "../TestBase.h"
-#include "io/ClientSocket.h"
-#include "core/Processor.h"
-#include "core/ClassLoader.h"
-
-TEST_CASE("Test Required", "[required]") {
- minifi::state::response::ComponentManifest manifest("minifi-system");
- auto serialized = manifest.serialize();
- REQUIRE(serialized.size() > 0);
- const auto &resp = serialized[0];
- REQUIRE(resp.children.size() > 0);
- const auto &processors = resp.children[0];
- REQUIRE(processors.children.size() > 0);
- const auto &proc_0 = processors.children[0];
- REQUIRE(proc_0.children.size() > 0);
- const auto &prop_descriptors = proc_0.children[0];
- REQUIRE(prop_descriptors.children.size() > 0);
- const auto &prop_0 = prop_descriptors.children[0];
- REQUIRE(prop_0.children.size() >= 3);
- const auto &prop_0_required = prop_0.children[2];
- REQUIRE("required" == prop_0_required.name);
- REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue());
-}
-
-TEST_CASE("Test Valid Regex", "[validRegex]") {
- minifi::state::response::ComponentManifest manifest("minifi-system");
- auto serialized = manifest.serialize();
- REQUIRE(serialized.size() > 0);
- const auto &resp = serialized[0];
- REQUIRE(resp.children.size() > 0);
- const auto &processors = resp.children[0];
- REQUIRE(processors.children.size() > 0);
- const auto &proc_0 = processors.children[0];
- REQUIRE(proc_0.children.size() > 0);
- const auto &prop_descriptors = proc_0.children[0];
- REQUIRE(prop_descriptors.children.size() > 0);
- const auto &prop_0 = prop_descriptors.children[0];
- REQUIRE(prop_0.children.size() >= 3);
- const auto &df = prop_0.children[3];
- REQUIRE("defaultValue" == df.name);
- const auto &prop_0_valid_regex = prop_0.children[4];
- REQUIRE("validRegex" == prop_0_valid_regex.name);
-}
-
-TEST_CASE("Test Relationships", "[rel1]") {
- minifi::state::response::ComponentManifest manifest("minifi-system");
- auto serialized = manifest.serialize();
- REQUIRE(serialized.size() > 0);
- const auto &resp = serialized[0];
- REQUIRE(resp.children.size() > 0);
- const auto &processors = resp.children[0];
- REQUIRE(processors.children.size() > 0);
- minifi::state::response::SerializedResponseNode proc_0;
- for (const auto &node : processors.children) {
- if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
- proc_0 = node;
- }
- }
- REQUIRE(proc_0.children.size() > 0);
- const auto &relationships = proc_0.children[1];
- REQUIRE("supportedRelationships" == relationships.name);
- // this is because they are now nested
- REQUIRE("supportedRelationships" == relationships.children[0].name);
- REQUIRE("name" == relationships.children[0].children[0].name);
- REQUIRE("failure" == relationships.children[0].children[0].value.to_string());
- REQUIRE("description" == relationships.children[0].children[1].name);
-
- REQUIRE("success" == relationships.children[1].children[0].value.to_string());
- REQUIRE("description" == relationships.children[1].children[1].name);
-
-
-}
-
-
-TEST_CASE("Test Dependent", "[dependent]") {
- minifi::state::response::ComponentManifest manifest("minifi-system");
- auto serialized = manifest.serialize();
- REQUIRE(serialized.size() > 0);
- const auto &resp = serialized[0];
- REQUIRE(resp.children.size() > 0);
- const auto &processors = resp.children[0];
- REQUIRE(processors.children.size() > 0);
- minifi::state::response::SerializedResponseNode proc_0;
- for (const auto &node : processors.children) {
- if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
- proc_0 = node;
- }
- }
- REQUIRE(proc_0.children.size() > 0);
- const auto &prop_descriptors = proc_0.children[0];
- REQUIRE(prop_descriptors.children.size() > 0);
- const auto &prop_0 = prop_descriptors.children[1];
- REQUIRE(prop_0.children.size() >= 3);
- REQUIRE("defaultValue" == prop_0.children[3].name);
- REQUIRE("validRegex" == prop_0.children[4].name);
- const auto &prop_0_dependent_0 = prop_descriptors.children[2];
- REQUIRE("Directory" == prop_0_dependent_0.name);
-}