You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/12 02:38:01 UTC
nifi-minifi-cpp git commit: MINIFICPP-634: Add RPG and tests
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 51cdbb773 -> e0d45609b
MINIFICPP-634: Add RPG and tests
This closes #412.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e0d45609
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e0d45609
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e0d45609
Branch: refs/heads/master
Commit: e0d45609bff83f89decfeb8a8fc7700020f19110
Parents: 51cdbb7
Author: Marc Parisi <ph...@apache.org>
Authored: Thu Oct 4 16:03:56 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Oct 11 22:37:39 2018 -0400
----------------------------------------------------------------------
libminifi/src/RemoteProcessorGroupPort.cpp | 21 +++++---
libminifi/test/unit/ProcessorTests.cpp | 69 +++++++++++++++++++++++++
2 files changed, 82 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 8bed630..7270361 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include "sitetosite/Peer.h"
+#include "Exception.h"
#include "sitetosite/SiteToSiteFactory.h"
#include "rapidjson/document.h"
@@ -162,16 +163,19 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
* we must rely on the configured host/port
*/
if (peers_.empty() && is_http_disabled()) {
- std::string host;
- int configured_port;
+ std::string host, portStr;
+ int configured_port = -1;
+ // place hostname/port into the log message if we have it
context->getProperty(hostName.getName(), host);
-
- int64_t lvalue;
- if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
- configured_port = static_cast<int>(lvalue);
+ context->getProperty(port.getName(), portStr);
+ if (!host.empty() && !portStr.empty() && !portStr.empty() && core::Property::StringToInt(portStr, configured_port)) {
+ nifi_instances_.push_back({ host, configured_port, "" });
+ bypass_rest_api_ = true;
+ } else {
+ // we cannot proceed, so log error and throw an exception
+ logger_->log_error("%s/%s/%d -- configuration values after eval of configuration options", host, portStr, configured_port);
+ throw(Exception(SITE2SITE_EXCEPTION, "HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule"));
}
- nifi_instances_.push_back({ host, configured_port, "" });
- bypass_rest_api_ = true;
}
// populate the site2site protocol for load balancing between them
if (peers_.size() > 0) {
@@ -194,6 +198,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
}
} else {
// we don't have any peers
+ logger_->log_error("No peers selected during scheduling");
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 32ba181..88f9b77 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -380,6 +380,75 @@ TEST_CASE("TestEmptyContent", "[emptyContent]") {
LogTestController::getInstance().reset();
}
+/**
+ * Tests the RPG bypass feature
+ * @param host to configure
+ * @param port port string to configure
+ * @param portVal port value to search in the corresponding log message
+ * @param hasException dictates if a failure should occur
+ */
+void testRPGBypass(const std::string &host, const std::string &port, const std::string &portVal = "-1", bool hasException = true) {
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setTrace<TestPlan>();
+
+ auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+ auto factory = minifi::io::StreamFactory::getInstance(configuration);
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+ std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ auto rpg = std::make_shared<minifi::RemoteProcessorGroupPort>(factory, "rpg", "http://localhost:8989/nifi", configuration);
+ rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host);
+ rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(rpg);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ auto psf = std::make_shared<core::ProcessSessionFactory>(context);
+ if (hasException) {
+ auto expected_error = "Site2Site Protocol:HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule";
+ try {
+ rpg->onSchedule(context, psf);
+ } catch (std::exception &e) {
+ REQUIRE(expected_error == std::string(e.what()));
+ }
+ std::stringstream search_expr;
+ search_expr << " " << host << "/" << port << "/" << portVal << " -- configuration values after eval of configuration options";
+ REQUIRE(LogTestController::getInstance().contains(search_expr.str()));
+ }
+ LogTestController::getInstance().reset();
+}
+
+/**
+ * Since there is no curl loaded in this test folder, we will have is_http_disabled be true.
+ */
+TEST_CASE("TestRPGNoSettings", "[TestRPG1]") {
+ testRPGBypass("", "");
+}
+
+TEST_CASE("TestRPGWithHost", "[TestRPG2]") {
+ testRPGBypass("hostname", "");
+}
+
+TEST_CASE("TestRPGWithHostInvalidPort", "[TestRPG3]") {
+ testRPGBypass("hostname", "hostname");
+}
+
+TEST_CASE("TestRPGWithoutHostValidPort", "[TestRPG4]") {
+ testRPGBypass("", "8080");
+}
+
+TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
+ testRPGBypass("", "hostname");
+}
+
+TEST_CASE("TestRPGValid", "[TestRPG6]") {
+ testRPGBypass("", "8080", "8080", false);
+}
+
int fileSize(const char *add) {
std::ifstream mySource;
mySource.open(add, std::ios_base::binary);