You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/08/28 20:07:59 UTC
nifi-minifi-cpp git commit: MINIFICPP-592: Update RPG to fall back
when cURL is not enable
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master 2ecb997c7 -> a68ed87ed
MINIFICPP-592: Update RPG to fall back when cURL is not enable
This closes #389.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a68ed87e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a68ed87e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a68ed87e
Branch: refs/heads/master
Commit: a68ed87ed1776b41bacef2d45fa52a9747c6a2a7
Parents: 2ecb997
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Aug 6 15:49:04 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Aug 28 16:07:31 2018 -0400
----------------------------------------------------------------------
libminifi/include/RemoteProcessorGroupPort.h | 28 +++++++++++----
libminifi/src/RemoteProcessorGroupPort.cpp | 42 +++++++++++++----------
2 files changed, 45 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a68ed87e/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index aece744..170bb49 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -44,20 +44,20 @@ namespace minifi {
* and decrements based on its construction. Using RAII we should
* never have the concern of thread safety.
*/
-class RPGLatch{
+class RPGLatch {
public:
- RPGLatch(bool increment = true){
- static std::atomic<int> latch_count (0);
+ RPGLatch(bool increment = true) {
+ static std::atomic<int> latch_count(0);
count = &latch_count;
if (increment)
count++;
}
- ~RPGLatch(){
+ ~RPGLatch() {
count--;
}
- int getCount(){
+ int getCount() {
return *count;
}
@@ -80,8 +80,9 @@ class RemoteProcessorGroupPort : public core::Processor {
timeout_(0),
url_(url),
http_enabled_(false),
+ bypass_rest_api_(false),
ssl_service(nullptr),
- logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()){
+ logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()) {
client_type_ = sitetosite::CLIENT_TYPE::RAW;
stream_factory_ = stream_factory;
if (uuid != nullptr) {
@@ -168,6 +169,19 @@ class RemoteProcessorGroupPort : public core::Processor {
protected:
+ /**
+ * Non static in case anything is loaded when this object is re-scheduled
+ */
+ bool is_http_disabled() {
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
+ if (ptr != nullptr) {
+ delete ptr;
+ return false;
+ } else {
+ return true;
+ }
+ }
+
std::shared_ptr<io::StreamFactory> stream_factory_;
std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create);
void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol);
@@ -195,6 +209,8 @@ class RemoteProcessorGroupPort : public core::Processor {
// http proxy
utils::HTTPProxy proxy_;
+ bool bypass_rest_api_;
+
sitetosite::CLIENT_TYPE client_type_;
// Remote Site2Site Info
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a68ed87e/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 629075c..519fec1 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -66,7 +66,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
if (!available_protocols_.try_dequeue(nextProtocol)) {
if (create) {
// create
- if (url_.empty()) {
+ if (url_.empty() || bypass_rest_api_) {
sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, port_, ssl_service != nullptr), this->getInterface(), client_type_);
config.setHTTPProxy(this->proxy_);
nextProtocol = sitetosite::createClient(config);
@@ -164,6 +164,20 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
if (peers_.size() > 0)
peer_index_ = 0;
}
+ /**
+ * If at this point we have no peers and HTTP support is disabled this means
+ * we must rely on the configured host/port
+ */
+ if (peers_.empty() && is_http_disabled()) {
+ context->getProperty(hostName.getName(), host_);
+
+ int64_t lvalue;
+ if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
+ port_ = static_cast<int>(lvalue);
+ site2site_port_ = port_;
+ }
+ bypass_rest_api_ = true;
+ }
// populate the site2site protocol for load balancing between them
if (peers_.size() > 0) {
auto count = peers_.size();
@@ -183,6 +197,8 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
logger_->log_trace("Created client, moving into available protocols");
returnProtocol(std::move(nextProtocol));
}
+ } else {
+ // we don't have any peers
}
}
@@ -209,20 +225,6 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessCont
std::string value;
logger_->log_trace("On trigger %s", getUUIDStr());
- if (url_.empty()) {
- if (context->getProperty(hostName.getName(), value) && !value.empty()) {
- host_ = value;
- }
-
- int64_t lvalue;
- if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
- port_ = static_cast<int>(lvalue);
- }
- }
-
- if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
- uuid_parse(value.c_str(), protocol_uuid_);
- }
std::unique_ptr<sitetosite::SiteToSiteClient> protocol_ = nullptr;
try {
@@ -257,7 +259,6 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/site-to-site";
- this->site2site_port_ = -1;
configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
configure_->get(Configure::nifi_rest_api_password, this->rest_password_);
@@ -290,9 +291,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
if (nullptr == client_ptr) {
- logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
+ logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!");
return;
}
+ this->site2site_port_ = -1;
client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
client->initialize("GET", fullUrl.c_str(), ssl_service);
if (ssl_service) {
@@ -315,7 +317,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
const std::vector<char> &response_body = client->getResponseBody();
if (!response_body.empty()) {
std::string controller = std::string(response_body.begin(), response_body.end());
- logger_->log_debug("controller config %s", controller);
+ logger_->log_trace("controller config %s", controller);
rapidjson::Document doc;
rapidjson::ParseResult ok = doc.Parse(controller.c_str());
@@ -348,8 +350,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
void RemoteProcessorGroupPort::refreshPeerList() {
refreshRemoteSite2SiteInfo();
- if (site2site_port_ == -1)
+ if (site2site_port_ == -1) {
+ logger_->log_debug("No port configured");
return;
+ }
this->peers_.clear();