You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/08/28 17:01:28 UTC

[2/2] nifi-minifi-cpp git commit: MINIFICPP-601: Add scheduling information

MINIFICPP-601: Add scheduling information

This closes #392.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2ecb997c
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2ecb997c
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2ecb997c

Branch: refs/heads/master
Commit: 2ecb997c72a5b6d80fb73d1eaf67b1e30cbf5c31
Parents: 7ac309b
Author: Marc Parisi <ph...@apache.org>
Authored: Tue Aug 21 20:22:57 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Aug 28 12:59:54 2018 -0400

----------------------------------------------------------------------
 libminifi/include/core/Processor.h              |   9 +-
 libminifi/include/core/ProcessorConfig.h        |   8 ++
 .../include/core/state/nodes/AgentInformation.h |  58 ++++++++
 libminifi/src/core/Processor.cpp                |   5 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp   |   6 +-
 libminifi/test/unit/ManifestTests.cpp           | 143 +++++++++++++++++++
 .../unit/PropertyValidationAgentInfoTests.cpp   | 120 ----------------
 7 files changed, 218 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 18fe2a0..6426f4c 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -53,9 +53,6 @@ namespace core {
 // Minimum scheduling period in Nano Second
 #define MINIMUM_SCHEDULING_NANOS 30000
 
-// Default yield period in second
-#define DEFAULT_YIELD_PERIOD_SECONDS 1
-
 // Default penalization period in second
 #define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
 
@@ -107,11 +104,11 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub
   }
   // Set Processor Run Duration in Nano Second
   void setRunDurationNano(uint64_t period) {
-    run_durantion_nano_ = period;
+    run_duration_nano_ = period;
   }
   // Get Processor Run Duration in Nano Second
   uint64_t getRunDurationNano(void) {
-    return (run_durantion_nano_);
+    return (run_duration_nano_);
   }
   // Set Processor yield period in MilliSecond
   void setYieldPeriodMsec(uint64_t period) {
@@ -258,7 +255,7 @@ class __attribute__((visibility("default"))) Processor : public Connectable, pub
   // SchedulePeriod in Nano Seconds
   std::atomic<uint64_t> scheduling_period_nano_;
   // Run Duration in Nano Seconds
-  std::atomic<uint64_t> run_durantion_nano_;
+  std::atomic<uint64_t> run_duration_nano_;
   // Yield Period in Milliseconds
   std::atomic<uint64_t> yield_period_msec_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/ProcessorConfig.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index c1d563e..aa8e18c 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -26,6 +26,14 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
+
+#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
+#define DEFAULT_SCHEDULING_PERIOD "1 sec"
+#define DEFAULT_RUN_DURATION 0
+#define DEFAULT_MAX_CONCURRENT_TASKS 1
+// Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
 struct ProcessorConfig {
   std::string id;
   std::string name;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/include/core/state/nodes/AgentInformation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index baafbbe..48a451c 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -48,6 +48,7 @@
 #include "agent/build_description.h"
 #include "core/ClassLoader.h"
 #include "../nodes/StateMonitor.h"
+#include "core/ProcessorConfig.h"
 
 namespace org {
 namespace apache {
@@ -284,6 +285,57 @@ class Bundles : public DeviceInformation {
 
 };
 
+class SchedulingDefaults : public DeviceInformation {
+ public:
+  SchedulingDefaults(std::string name, uuid_t uuid)
+      : DeviceInformation(name, uuid) {
+  }
+
+  SchedulingDefaults(const std::string &name)
+      : DeviceInformation(name, 0) {
+  }
+
+  std::string getName() const {
+    return "schedulingDefaults";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized;
+
+    SerializedResponseNode schedulingDefaults;
+    schedulingDefaults.name = "schedulingDefaults";
+
+    SerializedResponseNode defaultSchedulingStrategy;
+    defaultSchedulingStrategy.name = "defaultSchedulingStrategy";
+    defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY;
+
+    schedulingDefaults.children.push_back(defaultSchedulingStrategy);
+
+    SerializedResponseNode defaultSchedulingPeriod;
+    defaultSchedulingPeriod.name = "defaultSchedulingPeriod";
+    defaultSchedulingPeriod.value = DEFAULT_SCHEDULING_PERIOD;
+
+    schedulingDefaults.children.push_back(defaultSchedulingPeriod);
+
+    SerializedResponseNode defaultRunDuration;
+    defaultRunDuration.name = "defaultRunDurationNanos";
+    defaultRunDuration.value = DEFAULT_RUN_DURATION;
+
+    schedulingDefaults.children.push_back(defaultRunDuration);
+
+    SerializedResponseNode defaultMaxConcurrentTasks;
+    defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks";
+    defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS;
+
+    schedulingDefaults.children.push_back(defaultMaxConcurrentTasks);
+
+    serialized.push_back(schedulingDefaults);
+
+    return serialized;
+  }
+
+};
+
 /**
  * Justification and Purpose: Provides available extensions for the agent information block.
  */
@@ -491,6 +543,12 @@ class AgentManifest : public DeviceInformation {
       serialized.push_back(bundle);
     }
 
+    SchedulingDefaults defaults("schedulingDefaults", nullptr);
+
+    for (auto defaultNode : defaults.serialize()) {
+      serialized.push_back(defaultNode);
+    }
+
     return serialized;
   }
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 7adf718..d49bd55 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -31,6 +31,7 @@
 #include <functional>
 #include <utility>
 #include "Connection.h"
+#include "core/ProcessorConfig.h"
 #include "core/Connectable.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -55,10 +56,10 @@ Processor::Processor(std::string name, uuid_t uuid)
   loss_tolerant_ = false;
   _triggerWhenEmpty = false;
   scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
-  run_durantion_nano_ = 0;
+  run_duration_nano_ = DEFAULT_RUN_DURATION;
   yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
   _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
-  max_concurrent_tasks_ = 1;
+  max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
   active_tasks_ = 0;
   yield_expiration_ = 0;
   incoming_connections_Iter = this->_incomingConnections.begin();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 902bd89..c850afc 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -125,14 +125,14 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
 
         auto strategyNode = getOptionalField(&procNode,
                                              "scheduling strategy",
-                                             YAML::Node("EVENT_DRIVEN"),
+                                             YAML::Node(DEFAULT_SCHEDULING_STRATEGY),
                                              CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingStrategy = strategyNode.as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
         auto periodNode = getOptionalField(&procNode,
                                            "scheduling period",
-                                           YAML::Node("1 sec"),
+                                           YAML::Node(DEFAULT_SCHEDULING_PERIOD),
                                            CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingPeriod = periodNode.as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
@@ -153,7 +153,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core::
         }
 
         if (procNode["run duration nanos"]) {
-          procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+          procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>();
           logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/ManifestTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ManifestTests.cpp b/libminifi/test/unit/ManifestTests.cpp
new file mode 100644
index 0000000..09e278a
--- /dev/null
+++ b/libminifi/test/unit/ManifestTests.cpp
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "../../include/core/Processor.h"
+#include "../../include/core/state/nodes/AgentInformation.h"
+#include "../TestBase.h"
+#include "io/ClientSocket.h"
+#include "core/Processor.h"
+#include "core/ClassLoader.h"
+
+TEST_CASE("Test Required", "[required]") {
+  minifi::state::response::ComponentManifest manifest("minifi-system");
+  auto serialized = manifest.serialize();
+  REQUIRE(serialized.size() > 0);
+  const auto &resp = serialized[0];
+  REQUIRE(resp.children.size() > 0);
+  const auto &processors = resp.children[0];
+  REQUIRE(processors.children.size() > 0);
+  const auto &proc_0 = processors.children[0];
+  REQUIRE(proc_0.children.size() > 0);
+  const auto &prop_descriptors = proc_0.children[0];
+  REQUIRE(prop_descriptors.children.size() > 0);
+  const auto &prop_0 = prop_descriptors.children[0];
+  REQUIRE(prop_0.children.size() >= 3);
+  const auto &prop_0_required = prop_0.children[2];
+  REQUIRE("required" == prop_0_required.name);
+  REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue());
+}
+
+TEST_CASE("Test Valid Regex", "[validRegex]") {
+  minifi::state::response::ComponentManifest manifest("minifi-system");
+  auto serialized = manifest.serialize();
+  REQUIRE(serialized.size() > 0);
+  const auto &resp = serialized[0];
+  REQUIRE(resp.children.size() > 0);
+  const auto &processors = resp.children[0];
+  REQUIRE(processors.children.size() > 0);
+  const auto &proc_0 = processors.children[0];
+  REQUIRE(proc_0.children.size() > 0);
+  const auto &prop_descriptors = proc_0.children[0];
+  REQUIRE(prop_descriptors.children.size() > 0);
+  const auto &prop_0 = prop_descriptors.children[0];
+  REQUIRE(prop_0.children.size() >= 3);
+  const auto &df = prop_0.children[3];
+  REQUIRE("defaultValue" == df.name);
+  const auto &prop_0_valid_regex = prop_0.children[4];
+  REQUIRE("validRegex" == prop_0_valid_regex.name);
+}
+
+TEST_CASE("Test Relationships", "[rel1]") {
+  minifi::state::response::ComponentManifest manifest("minifi-system");
+  auto serialized = manifest.serialize();
+  REQUIRE(serialized.size() > 0);
+  const auto &resp = serialized[0];
+  REQUIRE(resp.children.size() > 0);
+  const auto &processors = resp.children[0];
+  REQUIRE(processors.children.size() > 0);
+  minifi::state::response::SerializedResponseNode proc_0;
+  for (const auto &node : processors.children) {
+    if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
+      proc_0 = node;
+    }
+  }
+  REQUIRE(proc_0.children.size() > 0);
+  const auto &relationships = proc_0.children[1];
+  REQUIRE("supportedRelationships" == relationships.name);
+  // this is because they are now nested
+  REQUIRE("supportedRelationships" == relationships.children[0].name);
+  REQUIRE("name" == relationships.children[0].children[0].name);
+  REQUIRE("failure" == relationships.children[0].children[0].value.to_string());
+  REQUIRE("description" == relationships.children[0].children[1].name);
+
+  REQUIRE("success" == relationships.children[1].children[0].value.to_string());
+  REQUIRE("description" == relationships.children[1].children[1].name);
+}
+
+TEST_CASE("Test Dependent", "[dependent]") {
+  minifi::state::response::ComponentManifest manifest("minifi-system");
+  auto serialized = manifest.serialize();
+  REQUIRE(serialized.size() > 0);
+  const auto &resp = serialized[0];
+  REQUIRE(resp.children.size() > 0);
+  const auto &processors = resp.children[0];
+  REQUIRE(processors.children.size() > 0);
+  minifi::state::response::SerializedResponseNode proc_0;
+  for (const auto &node : processors.children) {
+    if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
+      proc_0 = node;
+    }
+  }
+  REQUIRE(proc_0.children.size() > 0);
+  const auto &prop_descriptors = proc_0.children[0];
+  REQUIRE(prop_descriptors.children.size() > 0);
+  const auto &prop_0 = prop_descriptors.children[1];
+  REQUIRE(prop_0.children.size() >= 3);
+  REQUIRE("defaultValue" == prop_0.children[3].name);
+  REQUIRE("validRegex" == prop_0.children[4].name);
+  const auto &prop_0_dependent_0 = prop_descriptors.children[2];
+  REQUIRE("Directory" == prop_0_dependent_0.name);
+}
+
+TEST_CASE("Test Scheduling Defaults", "[schedDef]") {
+  minifi::state::response::AgentManifest manifest("minifi-system");
+  auto serialized = manifest.serialize();
+  REQUIRE(serialized.size() > 0);
+  minifi::state::response::SerializedResponseNode proc_0;
+  for (const auto &node : serialized) {
+    if ("schedulingDefaults" == node.name) {
+      proc_0 = node;
+    }
+  }
+  REQUIRE(proc_0.children.size() == 4);
+  for (const auto &child : proc_0.children) {
+    if ("defaultMaxConcurrentTasks" == child.name) {
+      REQUIRE("1" == child.value.to_string());
+    } else if ("defaultRunDurationNanos" == child.name) {
+      REQUIRE("0" == child.value.to_string());
+    } else if ("defaultSchedulingPeriod" == child.name) {
+      REQUIRE("1 sec" == child.value.to_string());
+    } else if ("defaultSchedulingStrategy" == child.name) {
+      REQUIRE("TIMER_DRIVEN" == child.value.to_string());
+    } else {
+      FAIL("UNKNOWQN NODE");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2ecb997c/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp b/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
deleted file mode 100644
index 31f9b7b..0000000
--- a/libminifi/test/unit/PropertyValidationAgentInfoTests.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <memory>
-
-#include "../../include/core/Processor.h"
-#include "../../include/core/state/nodes/AgentInformation.h"
-#include "../TestBase.h"
-#include "io/ClientSocket.h"
-#include "core/Processor.h"
-#include "core/ClassLoader.h"
-
-TEST_CASE("Test Required", "[required]") {
-  minifi::state::response::ComponentManifest manifest("minifi-system");
-  auto serialized = manifest.serialize();
-  REQUIRE(serialized.size() > 0);
-  const auto &resp = serialized[0];
-  REQUIRE(resp.children.size() > 0);
-  const auto &processors = resp.children[0];
-  REQUIRE(processors.children.size() > 0);
-  const auto &proc_0 = processors.children[0];
-  REQUIRE(proc_0.children.size() > 0);
-  const auto &prop_descriptors = proc_0.children[0];
-  REQUIRE(prop_descriptors.children.size() > 0);
-  const auto &prop_0 = prop_descriptors.children[0];
-  REQUIRE(prop_0.children.size() >= 3);
-  const auto &prop_0_required = prop_0.children[2];
-  REQUIRE("required" == prop_0_required.name);
-  REQUIRE(!std::dynamic_pointer_cast<minifi::state::response::BoolValue>(prop_0_required.value.getValue())->getValue());
-}
-
-TEST_CASE("Test Valid Regex", "[validRegex]") {
-  minifi::state::response::ComponentManifest manifest("minifi-system");
-  auto serialized = manifest.serialize();
-  REQUIRE(serialized.size() > 0);
-  const auto &resp = serialized[0];
-  REQUIRE(resp.children.size() > 0);
-  const auto &processors = resp.children[0];
-  REQUIRE(processors.children.size() > 0);
-  const auto &proc_0 = processors.children[0];
-  REQUIRE(proc_0.children.size() > 0);
-  const auto &prop_descriptors = proc_0.children[0];
-  REQUIRE(prop_descriptors.children.size() > 0);
-  const auto &prop_0 = prop_descriptors.children[0];
-  REQUIRE(prop_0.children.size() >= 3);
-  const auto &df = prop_0.children[3];
-  REQUIRE("defaultValue" == df.name);
-  const auto &prop_0_valid_regex = prop_0.children[4];
-  REQUIRE("validRegex" == prop_0_valid_regex.name);
-}
-
-TEST_CASE("Test Relationships", "[rel1]") {
-  minifi::state::response::ComponentManifest manifest("minifi-system");
-  auto serialized = manifest.serialize();
-  REQUIRE(serialized.size() > 0);
-  const auto &resp = serialized[0];
-  REQUIRE(resp.children.size() > 0);
-  const auto &processors = resp.children[0];
-  REQUIRE(processors.children.size() > 0);
-  minifi::state::response::SerializedResponseNode proc_0;
-  for (const auto &node : processors.children) {
-    if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
-      proc_0 = node;
-    }
-  }
-  REQUIRE(proc_0.children.size() > 0);
-  const auto &relationships = proc_0.children[1];
-  REQUIRE("supportedRelationships" == relationships.name);
-  // this is because they are now nested
-  REQUIRE("supportedRelationships" ==  relationships.children[0].name);
-  REQUIRE("name" ==  relationships.children[0].children[0].name);
-  REQUIRE("failure" ==  relationships.children[0].children[0].value.to_string());
-  REQUIRE("description" ==  relationships.children[0].children[1].name);
-
-  REQUIRE("success" ==  relationships.children[1].children[0].value.to_string());
-  REQUIRE("description" ==  relationships.children[1].children[1].name);
-
-
-}
-
-
-TEST_CASE("Test Dependent", "[dependent]") {
-  minifi::state::response::ComponentManifest manifest("minifi-system");
-  auto serialized = manifest.serialize();
-  REQUIRE(serialized.size() > 0);
-  const auto &resp = serialized[0];
-  REQUIRE(resp.children.size() > 0);
-  const auto &processors = resp.children[0];
-  REQUIRE(processors.children.size() > 0);
-  minifi::state::response::SerializedResponseNode proc_0;
-  for (const auto &node : processors.children) {
-    if ("org::apache::nifi::minifi::processors::PutFile" == node.name) {
-      proc_0 = node;
-    }
-  }
-  REQUIRE(proc_0.children.size() > 0);
-  const auto &prop_descriptors = proc_0.children[0];
-  REQUIRE(prop_descriptors.children.size() > 0);
-  const auto &prop_0 = prop_descriptors.children[1];
-  REQUIRE(prop_0.children.size() >= 3);
-  REQUIRE("defaultValue" == prop_0.children[3].name);
-  REQUIRE("validRegex" == prop_0.children[4].name);
-  const auto &prop_0_dependent_0 = prop_descriptors.children[2];
-  REQUIRE("Directory" == prop_0_dependent_0.name);
-}