You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/08/28 20:07:59 UTC

nifi-minifi-cpp git commit: MINIFICPP-592: Update RPG to fall back when cURL is not enable

Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 2ecb997c7 -> a68ed87ed


MINIFICPP-592: Update RPG to fall back when cURL is not enable

This closes #389.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a68ed87e
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a68ed87e
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a68ed87e

Branch: refs/heads/master
Commit: a68ed87ed1776b41bacef2d45fa52a9747c6a2a7
Parents: 2ecb997
Author: Marc Parisi <ph...@apache.org>
Authored: Mon Aug 6 15:49:04 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Tue Aug 28 16:07:31 2018 -0400

----------------------------------------------------------------------
 libminifi/include/RemoteProcessorGroupPort.h | 28 +++++++++++----
 libminifi/src/RemoteProcessorGroupPort.cpp   | 42 +++++++++++++----------
 2 files changed, 45 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a68ed87e/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index aece744..170bb49 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -44,20 +44,20 @@ namespace minifi {
  * and decrements based on its construction. Using RAII we should
  * never have the concern of thread safety.
  */
-class RPGLatch{
+class RPGLatch {
  public:
-  RPGLatch(bool increment = true){
-    static std::atomic<int> latch_count (0);
+  RPGLatch(bool increment = true) {
+    static std::atomic<int> latch_count(0);
     count = &latch_count;
     if (increment)
       count++;
   }
 
-  ~RPGLatch(){
+  ~RPGLatch() {
     count--;
   }
 
-  int getCount(){
+  int getCount() {
     return *count;
   }
 
@@ -80,8 +80,9 @@ class RemoteProcessorGroupPort : public core::Processor {
         timeout_(0),
         url_(url),
         http_enabled_(false),
+        bypass_rest_api_(false),
         ssl_service(nullptr),
-        logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()){
+        logger_(logging::LoggerFactory<RemoteProcessorGroupPort>::getLogger()) {
     client_type_ = sitetosite::CLIENT_TYPE::RAW;
     stream_factory_ = stream_factory;
     if (uuid != nullptr) {
@@ -168,6 +169,19 @@ class RemoteProcessorGroupPort : public core::Processor {
 
  protected:
 
+  /**
+   * Non static in case anything is loaded when this object is re-scheduled
+   */
+  bool is_http_disabled() {
+    auto ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
+    if (ptr != nullptr) {
+      delete ptr;
+      return false;
+    } else {
+      return true;
+    }
+  }
+
   std::shared_ptr<io::StreamFactory> stream_factory_;
   std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create);
   void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol);
@@ -195,6 +209,8 @@ class RemoteProcessorGroupPort : public core::Processor {
   // http proxy
   utils::HTTPProxy proxy_;
 
+  bool bypass_rest_api_;
+
   sitetosite::CLIENT_TYPE client_type_;
 
   // Remote Site2Site Info

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a68ed87e/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 629075c..519fec1 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -66,7 +66,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
   if (!available_protocols_.try_dequeue(nextProtocol)) {
     if (create) {
       // create
-      if (url_.empty()) {
+      if (url_.empty() || bypass_rest_api_) {
         sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared<sitetosite::Peer>(protocol_uuid_, host_, port_, ssl_service != nullptr), this->getInterface(), client_type_);
         config.setHTTPProxy(this->proxy_);
         nextProtocol = sitetosite::createClient(config);
@@ -164,6 +164,20 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
     if (peers_.size() > 0)
       peer_index_ = 0;
   }
+  /**
+   * If at this point we have no peers and HTTP support is disabled this means
+   * we must rely on the configured host/port
+   */
+  if (peers_.empty() && is_http_disabled()) {
+    context->getProperty(hostName.getName(), host_);
+
+    int64_t lvalue;
+    if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
+      port_ = static_cast<int>(lvalue);
+      site2site_port_ = port_;
+    }
+    bypass_rest_api_ = true;
+  }
   // populate the site2site protocol for load balancing between them
   if (peers_.size() > 0) {
     auto count = peers_.size();
@@ -183,6 +197,8 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
       logger_->log_trace("Created client, moving into available protocols");
       returnProtocol(std::move(nextProtocol));
     }
+  } else {
+    // we don't have any peers
   }
 }
 
@@ -209,20 +225,6 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessCont
   std::string value;
 
   logger_->log_trace("On trigger %s", getUUIDStr());
-  if (url_.empty()) {
-    if (context->getProperty(hostName.getName(), value) && !value.empty()) {
-      host_ = value;
-    }
-
-    int64_t lvalue;
-    if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) {
-      port_ = static_cast<int>(lvalue);
-    }
-  }
-
-  if (context->getProperty(portUUID.getName(), value) && !value.empty()) {
-    uuid_parse(value.c_str(), protocol_uuid_);
-  }
 
   std::unique_ptr<sitetosite::SiteToSiteClient> protocol_ = nullptr;
   try {
@@ -257,7 +259,6 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
 
   std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/site-to-site";
 
-  this->site2site_port_ = -1;
   configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_);
   configure_->get(Configure::nifi_rest_api_password, this->rest_password_);
 
@@ -290,9 +291,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
 
   auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
   if (nullptr == client_ptr) {
-    logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
+    logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!");
     return;
   }
+  this->site2site_port_ = -1;
   client = std::unique_ptr<utils::BaseHTTPClient>(dynamic_cast<utils::BaseHTTPClient*>(client_ptr));
   client->initialize("GET", fullUrl.c_str(), ssl_service);
   if (ssl_service) {
@@ -315,7 +317,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
     const std::vector<char> &response_body = client->getResponseBody();
     if (!response_body.empty()) {
       std::string controller = std::string(response_body.begin(), response_body.end());
-      logger_->log_debug("controller config %s", controller);
+      logger_->log_trace("controller config %s", controller);
       rapidjson::Document doc;
       rapidjson::ParseResult ok = doc.Parse(controller.c_str());
 
@@ -348,8 +350,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
 
 void RemoteProcessorGroupPort::refreshPeerList() {
   refreshRemoteSite2SiteInfo();
-  if (site2site_port_ == -1)
+  if (site2site_port_ == -1) {
+    logger_->log_debug("No port configured");
     return;
+  }
 
   this->peers_.clear();