You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/11/18 13:47:12 UTC

[nifi-minifi-cpp] branch main updated (9c439e272 -> 8d20c4482)

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 9c439e272 MINIFICPP-1981 Decrease default C2 heartbeat frequency
     new 9984cf9ba MINIFICPP-1976 - Do not store a whole Relationship object for each transfer
     new d1c0a9967 MINIFICPP-1984 - Restart old flow on failed flow update
     new 8d20c4482 MINIFICPP-1989 - Discard old controller serivice provider from FlowController

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tests/unit/ProcessGroupTestUtils.h             |  2 +-
 .../tests/unit/YamlConfigurationTests.cpp          | 10 +--
 libminifi/include/SchedulingAgent.h                |  3 +
 libminifi/include/core/ProcessSession.h            | 22 +++--
 libminifi/include/core/Processor.h                 |  2 +
 libminifi/include/core/ProcessorNode.h             |  9 +-
 libminifi/src/FlowController.cpp                   | 20 +++--
 libminifi/src/SchedulingAgent.cpp                  | 10 +--
 libminifi/src/core/ProcessSession.cpp              | 97 ++++++++++++----------
 libminifi/src/core/Processor.cpp                   |  4 +
 libminifi/src/core/yaml/YamlConfiguration.cpp      |  2 +
 libminifi/test/flow-tests/FlowControllerTests.cpp  | 36 ++++++++
 libminifi/test/flow-tests/TestControllerWithFlow.h |  7 +-
 13 files changed, 147 insertions(+), 77 deletions(-)


[nifi-minifi-cpp] 02/03: MINIFICPP-1984 - Restart old flow on failed flow update

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d1c0a99675ddb825d81b0ccce88e1dd4aedb8f3c
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Tue Nov 15 09:23:20 2022 +0100

    MINIFICPP-1984 - Restart old flow on failed flow update
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1452
---
 .../standard-processors/tests/unit/ProcessGroupTestUtils.h |  2 +-
 .../tests/unit/YamlConfigurationTests.cpp                  | 10 +++++-----
 libminifi/src/FlowController.cpp                           | 14 ++++++++------
 libminifi/src/core/yaml/YamlConfiguration.cpp              |  2 ++
 4 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
index 0ed125426..58a0dacff 100644
--- a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -68,7 +68,7 @@ struct Proc {
     return {{
       "- id: " + id,
       "  name: " + name,
-      "  class: LogAttribute"
+      "  class: LogOnDestructionProcessor"
     }};
   }
 };
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 30bdd9b38..f4fd27f29 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -516,8 +516,8 @@ TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
 Flow Controller:
   name: Simple
 Processors:
-- name: PutFile
-  class: PutFile
+- name: GenerateFlowFile
+  class: GenerateFlowFile
   Properties:
      Dynamic Property: Bad
       )";
@@ -525,10 +525,10 @@ Processors:
   std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
 
   REQUIRE(rootFlowConfig);
-  REQUIRE(rootFlowConfig->findProcessorByName("PutFile"));
-  const utils::Identifier uuid = rootFlowConfig->findProcessorByName("PutFile")->getUUID();
+  REQUIRE(rootFlowConfig->findProcessorByName("GenerateFlowFile"));
+  const utils::Identifier uuid = rootFlowConfig->findProcessorByName("GenerateFlowFile")->getUUID();
   REQUIRE(uuid);
-  REQUIRE(!rootFlowConfig->findProcessorByName("PutFile")->getUUIDStr().empty());
+  REQUIRE(!rootFlowConfig->findProcessorByName("GenerateFlowFile")->getUUIDStr().empty());
 
   REQUIRE(LogTestController::getInstance().contains("[warning] Unable to set the dynamic property "
                                                     "Dynamic Property with value Bad"));
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 8d0264a57..a6ca3a3dc 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -142,10 +142,16 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
       load(std::move(root_), true);
       flow_update_ = true;
       started = start() == 0;
+    } catch (const std::exception& ex) {
+      logger_->log_error("Caught exception while starting flow, type %s, what: %s", typeid(ex).name(), ex.what());
     } catch (...) {
-      this->root_ = std::move(prevRoot);
-      load(std::move(this->root_), true);
+      logger_->log_error("Caught unknown exception while starting flow, type %s", getCurrentExceptionTypeName());
+    }
+    if (!started) {
+      logger_->log_error("Failed to start new flow, restarting previous flow");
+      load(std::move(prevRoot), true);
       flow_update_ = true;
+      start();
     }
   }
 
@@ -292,10 +298,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload)
       this->root_ = loadInitialFlow();
     }
 
-    if (root_) {
-      root_->verify();
-    }
-
     logger_->log_info("Loaded root processor Group");
     logger_->log_info("Initializing timers");
     controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider();
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 9f9be9be6..e1f23f3ac 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -135,6 +135,8 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::N
     root->addControllerService(controller_service->getUUIDStr(), controller_service);
   }
 
+  root->verify();
+
   return root;
 }
 


[nifi-minifi-cpp] 01/03: MINIFICPP-1976 - Do not store a whole Relationship object for each transfer

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9984cf9baa7d93468c7923e60bea5be613d0eb3f
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Nov 2 12:02:16 2022 +0100

    MINIFICPP-1976 - Do not store a whole Relationship object for each transfer
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1447
---
 libminifi/include/core/ProcessSession.h | 22 ++++++--
 libminifi/include/core/ProcessorNode.h  |  9 +--
 libminifi/src/SchedulingAgent.cpp       | 10 +---
 libminifi/src/core/ProcessSession.cpp   | 97 +++++++++++++++++++--------------
 4 files changed, 76 insertions(+), 62 deletions(-)

diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 702e35231..208f7f100 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -28,6 +28,7 @@
 #include <algorithm>
 #include <set>
 #include <unordered_map>
+#include <unordered_set>
 
 #include "ProcessContext.h"
 #include "FlowFileRecord.h"
@@ -161,16 +162,25 @@ class ProcessSession : public ReferenceContainer {
     std::shared_ptr<FlowFile> snapshot;
   };
 
+  using Relationships = std::unordered_set<Relationship>;
+
+  Relationships relationships_;
+
+  struct NewFlowFileInfo {
+    std::shared_ptr<core::FlowFile> flow_file;
+    const Relationship* rel{nullptr};
+  };
+
   // FlowFiles being modified by current process session
-  std::map<utils::Identifier, FlowFileUpdate> _updatedFlowFiles;
+  std::map<utils::Identifier, FlowFileUpdate> updated_flowfiles_;
+  // updated FlowFiles being transferred to the relationship
+  std::map<utils::Identifier, const Relationship*> updated_relationships_;
   // FlowFiles being added by current process session
-  std::map<utils::Identifier, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
+  std::map<utils::Identifier, NewFlowFileInfo> added_flowfiles_;
   // FlowFiles being deleted by current process session
-  std::vector<std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
-  // FlowFiles being transfered to the relationship
-  std::map<utils::Identifier, Relationship> _transferRelationship;
+  std::vector<std::shared_ptr<core::FlowFile>> deleted_flowfiles_;
   // FlowFiles being cloned for multiple connections per relationship
-  std::vector<std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
+  std::vector<std::shared_ptr<core::FlowFile>> cloned_flowfiles_;
 
  private:
   enum class RouteResult {
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 7afce4ceb..96014ed9d 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -15,8 +15,7 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORNODE_H_
-#define LIBMINIFI_INCLUDE_CORE_PROCESSORNODE_H_
+#pragma once
 
 #include <memory>
 #include <set>
@@ -167,11 +166,11 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
     processor_->setAutoTerminatedRelationships(relationships);
   }
 
-  bool isAutoTerminated(Relationship relationship) {
+  bool isAutoTerminated(const Relationship& relationship) {
     return processor_->isAutoTerminated(relationship);
   }
 
-  bool isSupportedRelationship(Relationship relationship) {
+  bool isSupportedRelationship(const Relationship& relationship) {
     return processor_->isSupportedRelationship(relationship);
   }
 
@@ -278,5 +277,3 @@ class ProcessorNode : public ConfigurableComponent, public Connectable {
 };
 
 }  // namespace org::apache::nifi::minifi::core
-
-#endif  // LIBMINIFI_INCLUDE_CORE_PROCESSORNODE_H_
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 5ad80fc9a..5b59faa31 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -32,10 +32,7 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 }
 }  // namespace
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
@@ -141,7 +138,4 @@ void SchedulingAgent::watchDogFunc() {
   }
 }
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index d472fff0e..0181f1e36 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -85,10 +85,10 @@ ProcessSession::~ProcessSession() {
 
 void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
   utils::Identifier uuid = record->getUUID();
-  if (_updatedFlowFiles.find(uuid) != _updatedFlowFiles.end()) {
+  if (updated_flowfiles_.find(uuid) != updated_flowfiles_.end()) {
     throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
   }
-  _addedFlowFiles[uuid] = record;
+  added_flowfiles_[uuid].flow_file = record;
   record->setDeleted(false);
 }
 
@@ -114,7 +114,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<cor
   }
 
   utils::Identifier uuid = record->getUUID();
-  _addedFlowFiles[uuid] = record;
+  added_flowfiles_[uuid].flow_file = record;
   logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " creates flow record " << record->getUUIDStr();
@@ -146,7 +146,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(const std::s
   if (flow_version != nullptr) {
     record->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
   }
-  this->_clonedFlowFiles.push_back(record);
+  this->cloned_flowfiles_.push_back(record);
   logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr());
   // Copy attributes
   for (const auto& attribute : parent->getAttributes()) {
@@ -196,7 +196,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
-  _deletedFlowFiles.push_back(flow);
+  deleted_flowfiles_.push_back(flow);
   std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
 }
@@ -224,14 +224,18 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   utils::Identifier uuid = flow->getUUID();
-  _transferRelationship[uuid] = relationship;
+  if (auto it = added_flowfiles_.find(uuid); it != added_flowfiles_.end()) {
+    it->second.rel = &*relationships_.insert(relationship).first;
+  } else {
+    updated_relationships_[uuid] = &*relationships_.insert(relationship).first;
+  }
   flow->setDeleted(false);
 }
 
 void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
-  gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
-      || _addedFlowFiles.contains(flow->getUUID())
-      || std::any_of(_clonedFlowFiles.begin(), _clonedFlowFiles.end(), [&flow](const auto& flow_file) { return flow == flow_file; }));
+  gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
+      || added_flowfiles_.contains(flow->getUUID())
+      || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return flow == flow_file; }));
 
   std::shared_ptr<ResourceClaim> claim = content_session_->create();
 
@@ -274,9 +278,9 @@ void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_fil
 }
 
 void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
-  gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
-      || _addedFlowFiles.contains(flow->getUUID())
-      || std::any_of(_clonedFlowFiles.begin(), _clonedFlowFiles.end(), [&flow](const auto& flow_file) { return flow == flow_file; }));
+  gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
+      || added_flowfiles_.contains(flow->getUUID())
+      || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return flow == flow_file; }));
 
   std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
   if (!claim) {
@@ -748,11 +752,15 @@ ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr<
     return RouteResult::Ok_Deleted;
   }
   utils::Identifier uuid = record->getUUID();
-  auto itRelationship = _transferRelationship.find(uuid);
-  if (itRelationship == _transferRelationship.end()) {
+  Relationship relationship;
+  if (auto it = updated_relationships_.find(uuid); it != updated_relationships_.end()) {
+    gsl_Expects(it->second);
+    relationship = *it->second;
+  } else if (auto new_it = added_flowfiles_.find(uuid); new_it != added_flowfiles_.end() && new_it->second.rel) {
+    relationship = *new_it->second.rel;
+  } else {
     return RouteResult::Error_NoRelationship;
   }
-  Relationship relationship = itRelationship->second;
   // Find the relationship, we need to find the connections for that relationship
   const auto connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName());
   if (connections.empty()) {
@@ -798,7 +806,7 @@ void ProcessSession::commit() {
       transfers[relationship.getName()].transfer_size += record.getSize();
     };
     // First we clone the flow record based on the transferred relationship for updated flow record
-    for (auto && it : _updatedFlowFiles) {
+    for (auto && it : updated_flowfiles_) {
       auto record = it.second.modified;
       if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
@@ -807,8 +815,8 @@ void ProcessSession::commit() {
     }
 
     // Do the same thing for added flow file
-    for (const auto& it : _addedFlowFiles) {
-      auto record = it.second;
+    for (const auto& it : added_flowfiles_) {
+      auto record = it.second.flow_file;
       if (routeFlowFile(record, increaseTransferMetrics) == RouteResult::Error_NoRelationship) {
         // Can not find relationship for the flow
         throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the added flow " + record->getUUIDStr());
@@ -819,9 +827,9 @@ void ProcessSession::commit() {
 
     Connectable* connection = nullptr;
     // Complete process the added and update flow files for the session, send the flow file to its queue
-    for (const auto &it : _updatedFlowFiles) {
+    for (const auto &it : updated_flowfiles_) {
       auto record = it.second.modified;
-      logger_->log_trace("See %s in %s", record->getUUIDStr(), "_updatedFlowFiles");
+      logger_->log_trace("See %s in %s", record->getUUIDStr(), "updated_flowfiles_");
       if (record->isDeleted()) {
         continue;
       }
@@ -831,9 +839,9 @@ void ProcessSession::commit() {
         connectionQueues[connection].push_back(record);
       }
     }
-    for (const auto &it : _addedFlowFiles) {
-      auto record = it.second;
-      logger_->log_trace("See %s in %s", record->getUUIDStr(), "_addedFlowFiles");
+    for (const auto &it : added_flowfiles_) {
+      auto record = it.second.flow_file;
+      logger_->log_trace("See %s in %s", record->getUUIDStr(), "added_flowfiles_");
       if (record->isDeleted()) {
         continue;
       }
@@ -843,8 +851,8 @@ void ProcessSession::commit() {
       }
     }
     // Process the clone flow files
-    for (const auto &record : _clonedFlowFiles) {
-      logger_->log_trace("See %s in %s", record->getUUIDStr(), "_clonedFlowFiles");
+    for (const auto &record : cloned_flowfiles_) {
+      logger_->log_trace("See %s in %s", record->getUUIDStr(), "cloned_flowfiles_");
       if (record->isDeleted()) {
         continue;
       }
@@ -854,7 +862,7 @@ void ProcessSession::commit() {
       }
     }
 
-    for (const auto& record : _deletedFlowFiles) {
+    for (const auto& record : deleted_flowfiles_) {
       if (!record->isDeleted()) {
         continue;
       }
@@ -872,7 +880,7 @@ void ProcessSession::commit() {
       throw Exception(PROCESS_SESSION_EXCEPTION, "State manager commit failed.");
     }
 
-    persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
+    persistFlowFilesBeforeTransfer(connectionQueues, updated_flowfiles_);
 
     for (auto& cq : connectionQueues) {
       auto connection = dynamic_cast<Connection*>(cq.first);
@@ -894,12 +902,13 @@ void ProcessSession::commit() {
     }
 
     // All done
-    _updatedFlowFiles.clear();
-    _addedFlowFiles.clear();
-    _clonedFlowFiles.clear();
-    _deletedFlowFiles.clear();
+    updated_flowfiles_.clear();
+    added_flowfiles_.clear();
+    cloned_flowfiles_.clear();
+    deleted_flowfiles_.clear();
 
-    _transferRelationship.clear();
+    updated_relationships_.clear();
+    relationships_.clear();
     // persistent the provenance report
     this->provenance_report_->commit();
     logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode()->getName());
@@ -919,7 +928,7 @@ void ProcessSession::rollback() {
 
   try {
     // Requeue the snapshot of the flowfile back
-    for (const auto &it : _updatedFlowFiles) {
+    for (const auto &it : updated_flowfiles_) {
       auto flowFile = it.second.modified;
       // restore flowFile to original state
       *flowFile = *it.second.snapshot;
@@ -931,7 +940,7 @@ void ProcessSession::rollback() {
       connectionQueues[flowFile->getConnection()].push_back(flowFile);
     }
 
-    for (const auto& record : _deletedFlowFiles) {
+    for (const auto& record : deleted_flowfiles_) {
       record->setDeleted(false);
     }
 
@@ -953,10 +962,11 @@ void ProcessSession::rollback() {
       throw Exception(PROCESS_SESSION_EXCEPTION, "State manager rollback failed.");
     }
 
-    _clonedFlowFiles.clear();
-    _addedFlowFiles.clear();
-    _updatedFlowFiles.clear();
-    _deletedFlowFiles.clear();
+    cloned_flowfiles_.clear();
+    added_flowfiles_.clear();
+    updated_flowfiles_.clear();
+    deleted_flowfiles_.clear();
+    relationships_.clear();
     logger_->log_warn("ProcessSession rollback for %s executed", process_context_->getProcessorNode()->getName());
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught Exception during process session rollback, type: %s, what: %s", typeid(exception).name(), exception.what());
@@ -1097,7 +1107,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       *snapshot = *ret;
       logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
       utils::Identifier uuid = ret->getUUID();
-      _updatedFlowFiles[uuid] = {ret, snapshot};
+      updated_flowfiles_[uuid] = {ret, snapshot};
       auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
       if (flow_version != nullptr) {
         ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
@@ -1127,9 +1137,12 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
 }
 
 bool ProcessSession::existsFlowFileInRelationship(const Relationship &relationship) {
-  return std::any_of(_transferRelationship.begin(), _transferRelationship.end(),
-      [&relationship](const std::map<utils::Identifier, Relationship>::value_type &key_value_pair) {
-        return relationship == key_value_pair.second;
+  return std::any_of(updated_relationships_.begin(), updated_relationships_.end(),
+      [&](const auto& key_value_pair) {
+        return key_value_pair.second && relationship == *key_value_pair.second;
+  }) || std::any_of(added_flowfiles_.begin(), added_flowfiles_.end(),
+      [&](const auto& key_value_pair) {
+        return key_value_pair.second.rel && relationship == *key_value_pair.second.rel;
   });
 }
 


[nifi-minifi-cpp] 03/03: MINIFICPP-1989 - Discard old controller serivice provider from FlowController

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8d20c448215a2e7bb0e7d4e49422ec172774b8aa
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Wed Nov 16 21:45:24 2022 +0100

    MINIFICPP-1989 - Discard old controller serivice provider from FlowController
    
    Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
    
    This closes #1453
---
 libminifi/include/SchedulingAgent.h                |  3 ++
 libminifi/include/core/Processor.h                 |  2 ++
 libminifi/src/FlowController.cpp                   |  6 +++-
 libminifi/src/core/Processor.cpp                   |  4 +++
 libminifi/test/flow-tests/FlowControllerTests.cpp  | 36 ++++++++++++++++++++++
 libminifi/test/flow-tests/TestControllerWithFlow.h |  7 +++--
 6 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index f02f4773a..3ec3ad892 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -78,6 +78,8 @@ class SchedulingAgent {
       watchDogTimer_.reset(new utils::CallBackTimer(std::chrono::milliseconds(SCHEDULING_WATCHDOG_CHECK_PERIOD_MS), f));
       watchDogTimer_->start();
     }
+
+    logger_->log_trace("Creating scheduling agent");
   }
 
   virtual ~SchedulingAgent() {
@@ -85,6 +87,7 @@ class SchedulingAgent {
     // The destructor of the timer also stops is, but the stop should happen first!
     // Otherwise the callback might access already destructed members.
     watchDogTimer_.reset();
+    logger_->log_trace("Destroying scheduling agent");
   }
 
   // onTrigger, return whether the yield is need
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index bf3164016..81706134f 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -84,6 +84,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state
 
   bool isRunning() override;
 
+  ~Processor() override;
+
   void setScheduledState(ScheduledState state);
 
   ScheduledState getScheduledState() const {
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index a6ca3a3dc..6ef93b35b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -100,6 +100,7 @@ FlowController::~FlowController() {
   protocol_ = nullptr;
   flow_file_repo_ = nullptr;
   provenance_repo_ = nullptr;
+  logger_->log_trace("Destroying FlowController");
 }
 
 bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) {
@@ -129,7 +130,10 @@ bool FlowController::applyConfiguration(const std::string &source, const std::st
     std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
     stop();
     unload();
-    controller_map_->clear();
+
+    // prepare to accept the new controller service provider from flow_configuration_
+    clearControllerServices();
+
     clearResponseNodes();
     if (metrics_publisher_) {
       metrics_publisher_->clearMetricNodes();
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 26adfe1ef..022941b24 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -79,6 +79,10 @@ Processor::Processor(std::string name, const utils::Identifier& uuid, std::share
   logger_->log_debug("Processor %s created with uuid %s", name_, getUUIDStr());
 }
 
+Processor::~Processor() {
+  logger_->log_debug("Destroying processor %s with uuid %s", name_, getUUIDStr());
+}
+
 bool Processor::isRunning() {
   return (state_ == RUNNING && active_tasks_ > 0);
 }
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 2000132c7..8087da65c 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -35,6 +35,8 @@
 #include "YamlConfiguration.h"
 #include "CustomProcessors.h"
 #include "TestControllerWithFlow.h"
+#include "EmptyFlow.h"
+#include "utils/IntegrationTestUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -263,3 +265,37 @@ TEST_CASE("Extend the waiting period during shutdown", "[TestFlow4]") {
   REQUIRE(sourceProc->trigger_count.load() >= 1);
   REQUIRE(sinkProc->trigger_count.load() >= 3);
 }
+
+TEST_CASE("FlowController destructor releases resources", "[TestFlow5]") {
+  TestControllerWithFlow controller(R"(
+Flow Controller:
+  name: Banana Bread
+Processors:
+- name: GenFF
+  id: 00000000-0000-0000-0000-000000000001
+  class: GenerateFlowFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 10 sec
+Connections: []
+Remote Processing Groups: []
+Controller Services: []
+)");
+
+  controller.startFlow();
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
+  LogTestController::getInstance().clear();
+
+  bool update_successful = controller.controller_->applyConfiguration("/flows/1", empty_flow);
+  REQUIRE(update_successful);
+
+  REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
+  REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
+  LogTestController::getInstance().clear();
+
+  // manually destroy the controller
+  controller.controller_.reset();
+
+  REQUIRE(utils::verifyLogLinePresenceInPollTime(0s, "Destroying FlowController"));
+  REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
+}
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b311b15c3..b72a188a9 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -39,6 +39,7 @@ class TestControllerWithFlow: public TestController{
     LogTestController::getInstance().setTrace<core::Processor>();
     LogTestController::getInstance().setTrace<minifi::TimerDrivenSchedulingAgent>();
     LogTestController::getInstance().setTrace<minifi::EventDrivenSchedulingAgent>();
+    LogTestController::getInstance().setTrace<minifi::FlowController>();
 
     home_ = createTempDirectory();
 
@@ -78,8 +79,10 @@ class TestControllerWithFlow: public TestController{
   }
 
   ~TestControllerWithFlow() {
-    controller_->stop();
-    controller_->unload();
+    if (controller_) {
+      controller_->stop();
+      controller_->unload();
+    }
     LogTestController::getInstance().reset();
   }