You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/12 02:38:01 UTC

nifi-minifi-cpp git commit: MINIFICPP-634: Add RPG and tests

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 51cdbb773 -> e0d45609b


MINIFICPP-634: Add RPG and tests

This closes #412.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e0d45609
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e0d45609
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e0d45609

Branch: refs/heads/master
Commit: e0d45609bff83f89decfeb8a8fc7700020f19110
Parents: 51cdbb7
Author: Marc Parisi <ph...@apache.org>
Authored: Thu Oct 4 16:03:56 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Oct 11 22:37:39 2018 -0400

----------------------------------------------------------------------
 libminifi/src/RemoteProcessorGroupPort.cpp | 21 +++++---
 libminifi/test/unit/ProcessorTests.cpp     | 69 +++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 8bed630..7270361 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -33,6 +33,7 @@
 #include <utility>
 
 #include "sitetosite/Peer.h"
+#include "Exception.h"
 #include "sitetosite/SiteToSiteFactory.h"
 
 #include "rapidjson/document.h"
@@ -162,16 +163,19 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
    * we must rely on the configured host/port
    */
   if (peers_.empty() && is_http_disabled()) {
-    std::string host;
-    int configured_port;
+    std::string host, portStr;
+    int configured_port = -1;
+    // place hostname/port into the log message if we have it
     context->getProperty(hostName.getName(), host);
-
-    int64_t lvalue;
-    if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
-      configured_port = static_cast<int>(lvalue);
+    context->getProperty(port.getName(), portStr);
+    if (!host.empty() && !portStr.empty() && !portStr.empty() && core::Property::StringToInt(portStr, configured_port)) {
+      nifi_instances_.push_back({ host, configured_port, "" });
+      bypass_rest_api_ = true;
+    } else {
+      // we cannot proceed, so log error and throw an exception
+      logger_->log_error("%s/%s/%d -- configuration values after eval of configuration options", host, portStr, configured_port);
+      throw(Exception(SITE2SITE_EXCEPTION, "HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule"));
     }
-    nifi_instances_.push_back({ host, configured_port, "" });
-    bypass_rest_api_ = true;
   }
   // populate the site2site protocol for load balancing between them
   if (peers_.size() > 0) {
@@ -194,6 +198,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
     }
   } else {
     // we don't have any peers
+    logger_->log_error("No peers selected during scheduling");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 32ba181..88f9b77 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -380,6 +380,75 @@ TEST_CASE("TestEmptyContent", "[emptyContent]") {
   LogTestController::getInstance().reset();
 }
 
+/**
+ * Tests the RPG bypass feature
+ * @param host to configure
+ * @param port port string to configure
+ * @param portVal port value to search in the corresponding log message
+ * @param hasException dictates if a failure should occur
+ */
+void testRPGBypass(const std::string &host, const std::string &port, const std::string &portVal = "-1", bool hasException = true) {
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
+  LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+  LogTestController::getInstance().setTrace<TestPlan>();
+
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  auto factory = minifi::io::StreamFactory::getInstance(configuration);
+
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+  auto rpg = std::make_shared<minifi::RemoteProcessorGroupPort>(factory, "rpg", "http://localhost:8989/nifi", configuration);
+  rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host);
+  rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
+  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(rpg);
+  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+  auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+  auto psf = std::make_shared<core::ProcessSessionFactory>(context);
+  if (hasException) {
+    auto expected_error = "Site2Site Protocol:HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule";
+    try {
+      rpg->onSchedule(context, psf);
+    } catch (std::exception &e) {
+      REQUIRE(expected_error == std::string(e.what()));
+    }
+    std::stringstream search_expr;
+    search_expr << " " << host << "/" << port << "/" << portVal << " -- configuration values after eval of configuration options";
+    REQUIRE(LogTestController::getInstance().contains(search_expr.str()));
+  }
+  LogTestController::getInstance().reset();
+}
+
+/**
+ * Since there is no curl loaded in this test folder, we will have is_http_disabled be true.
+ */
+TEST_CASE("TestRPGNoSettings", "[TestRPG1]") {
+  testRPGBypass("", "");
+}
+
+TEST_CASE("TestRPGWithHost", "[TestRPG2]") {
+  testRPGBypass("hostname", "");
+}
+
+TEST_CASE("TestRPGWithHostInvalidPort", "[TestRPG3]") {
+  testRPGBypass("hostname", "hostname");
+}
+
+TEST_CASE("TestRPGWithoutHostValidPort", "[TestRPG4]") {
+  testRPGBypass("", "8080");
+}
+
+TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
+  testRPGBypass("", "hostname");
+}
+
+TEST_CASE("TestRPGValid", "[TestRPG6]") {
+  testRPGBypass("", "8080", "8080", false);
+}
+
 int fileSize(const char *add) {
   std::ifstream mySource;
   mySource.open(add, std::ios_base::binary);