You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/22 09:19:20 UTC

[GitHub] [nifi-minifi-cpp] fgerlits opened a new pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

fgerlits opened a new pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013


   https://issues.apache.org/jira/browse/MINIFICPP-1487
   
   If the number of incoming flow files before and after onTrigger() are non-zero and equal, then the processor should yield.  This can happen, for example, if all the incoming flow files are penalized.  In this case, without a yield, the processor would get  triggered repeatedly in a tight loop, causing 100% CPU usage (of one core).
   
   The unit test is long and ugly, but we can't use TestController::runSession(), because that calls `Processor::onTrigger(shared_ptr<ProcessContext>, shared_ptr<ProcessSession>)` directly, rather than going through `Processor::onTrigger(shared_ptr<ProcessContext>, shared_ptr<ProcessSessionFactory>)` as it happens in production.
   
   ---
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [x] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r589450106



##########
File path: libminifi/src/Connection.cpp
##########
@@ -227,8 +212,7 @@ void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);

Review comment:
       I don't think we can (or want to) stop new flow files from being added to the connection queue after a `drain()`, and I would say a `put()` which resumes after it has acquired the lock happens after the `drain()`, even if it started before.
   
   But yes, I agree it's not related to this change. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580306578



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses
+  uint64_t num_incoming_flow_files_queued_after = numIncomingFlowFilesQueued();
+
+  if (num_incoming_flow_files_queued_before != 0 && num_incoming_flow_files_queued_before == num_incoming_flow_files_queued_after) {

Review comment:
       As we don't hold lock on the incoming connection during the whole ontrigger execution, flowfiles can be added there in the meanwhile. This condition can be satisfied during normal operation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #1013: MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580930418



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses
+  uint64_t num_incoming_flow_files_queued_after = numIncomingFlowFilesQueued();
+
+  if (num_incoming_flow_files_queued_before != 0 && num_incoming_flow_files_queued_before == num_incoming_flow_files_queued_after) {

Review comment:
       Yes, good point -- I'll need to rethink this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580934162



##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -614,3 +614,109 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
 TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
+
+class TestProcessorProcessIncomingQueue : public minifi::core::Processor {
+ public:
+  explicit TestProcessorProcessIncomingQueue(std::string name, utils::Identifier uuid = {}) : Processor(name, uuid) {
+    setSupportedProperties({NumFlowFiles});
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override {
+    context->getProperty(NumFlowFiles.getName(), num_flow_files_);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) override {
+    for (uint32_t i = 0; i < num_flow_files_; ++i) {
+      auto flow_file = session->get();
+      if (flow_file) {
+        session->remove(flow_file);
+      }
+    }
+  }
+
+  static core::Property NumFlowFiles;
+  uint32_t num_flow_files_ = 0;
+};
+
+core::Property TestProcessorProcessIncomingQueue::NumFlowFiles = core::PropertyBuilder::createProperty("Number of Flow Files to Process")
+    ->withDefaultValue<uint32_t>(0)
+    ->build();
+
+REGISTER_RESOURCE(TestProcessorProcessIncomingQueue, "A mock processor that processes a configurable number of incoming flow files");
+
+bool testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(uint32_t num_incoming_, uint32_t num_processed_) {
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+
+  const auto repo = std::make_shared<TestRepository>();
+  const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const std::shared_ptr<core::Processor> generate_flow_file = std::make_shared<processors::GenerateFlowFile>("generate_flow_file");
+  const auto generate_flow_file_node = std::make_shared<core::ProcessorNode>(generate_flow_file);
+  generate_flow_file->initialize();
+  const auto gff_context = std::make_shared<core::ProcessContext>(generate_flow_file_node, nullptr, repo, repo, content_repo);
+  gff_context->setProperty(processors::GenerateFlowFile::BatchSize, std::to_string(num_incoming_));
+
+  const std::shared_ptr<core::Processor> process_incoming_queue = std::make_shared<TestProcessorProcessIncomingQueue>("process_incoming_queue");
+  const auto process_incoming_queue_node = std::make_shared<core::ProcessorNode>(process_incoming_queue);
+  process_incoming_queue->initialize();
+  const auto piq_context = std::make_shared<core::ProcessContext>(process_incoming_queue_node, nullptr, repo, repo, content_repo);
+  piq_context->setProperty(TestProcessorProcessIncomingQueue::NumFlowFiles, std::to_string(num_processed_));
+
+  const auto connection = std::make_shared<minifi::Connection>(repo, content_repo, "ggf_to_piq");
+  connection->addRelationship(core::Relationship{"success", ""});
+  connection->setSourceUUID(generate_flow_file->getUUID());
+  connection->setDestinationUUID(process_incoming_queue->getUUID());
+  generate_flow_file->addConnection(connection);
+  process_incoming_queue->addConnection(connection);
+
+  const auto gff_session_factory = std::make_shared<core::ProcessSessionFactory>(gff_context);
+  generate_flow_file->setScheduledState(core::ScheduledState::RUNNING);
+  generate_flow_file->onSchedule(gff_context, gff_session_factory);
+  generate_flow_file->onTrigger(gff_context, gff_session_factory);
+
+  const auto piq_session_factory = std::make_shared<core::ProcessSessionFactory>(piq_context);
+  process_incoming_queue->setScheduledState(core::ScheduledState::RUNNING);
+  process_incoming_queue->onSchedule(piq_context, piq_session_factory);
+  process_incoming_queue->onTrigger(piq_context, piq_session_factory);
+
+  return process_incoming_queue->isYield();
+}
+
+TEST_CASE("If there are no incoming flow files, then there is no automatic yield", "[AutomaticYield]") {
+  SECTION("0 flow files in the queue, we don't try to process any") {
+    auto is_yield = testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(0, 0);
+    REQUIRE_FALSE(is_yield);
+  }
+  SECTION("0 flow files in the queue, we try to process one") {

Review comment:
       We call `session->get()` once, which is what I meant by "try to process one".
   
   Which function do you mean?  Probably best to discuss offline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580932043



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -223,7 +223,18 @@ bool Processor::flowFilesQueued() {
   return false;
 }
 
-bool Processor::flowFilesOutGoingFull() {
+uint64_t Processor::numIncomingFlowFilesQueued() const {
+  std::lock_guard<std::mutex> lock{mutex_};
+
+  uint64_t sum = 0;
+  for (const auto& connectable : _incomingConnections) {
+    auto connection = std::static_pointer_cast<Connection>(connectable);

Review comment:
       It does seem like we always store Connections in it, so this is a good idea, although not related to this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580349065



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses

Review comment:
       The lock is released inbetween these calls. Could there be no issue with subsequent calls to this function on multiple threads? Eg. can this case not happen?
   1. Count flowfiles before
   1. Call onTrigger
   1. Add files to incoming connections on a different thread
   1. Count flowfiles after




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580366848



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -223,7 +223,18 @@ bool Processor::flowFilesQueued() {
   return false;
 }
 
-bool Processor::flowFilesOutGoingFull() {
+uint64_t Processor::numIncomingFlowFilesQueued() const {
+  std::lock_guard<std::mutex> lock{mutex_};
+
+  uint64_t sum = 0;
+  for (const auto& connectable : _incomingConnections) {
+    auto connection = std::static_pointer_cast<Connection>(connectable);

Review comment:
       Does this assume that on a connectable, every incoming connection is actually a `Connection`?  If so, why not change the type of the `_incomingConnections` to a `std::set<std::shared_ptr<Connection>>`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r589321210



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses
+  uint64_t num_incoming_flow_files_queued_after = numIncomingFlowFilesQueued();
+
+  if (num_incoming_flow_files_queued_before != 0 && num_incoming_flow_files_queued_before == num_incoming_flow_files_queued_after) {

Review comment:
       I have rewritten the code to use a different approach: now `isWorkAvailable()` returns false when all the incoming flow files are penalized.  This will cause the processor to yield for a `nifi.bored.yield.duration` amount of time, so it will no longer spin in a tight loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580927351



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses

Review comment:
       https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580306578
   
   That's what I say, too. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580349065



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses

Review comment:
       The lock is unlocked inbetween the calls. Could there be no issue with subsequent calls to this function on multiple threads? Eg. can this case not happen?
   1. Count flowfiles before
   1. Call onTrigger
   1. Add files to incoming connections on a different thread
   1. Count flowfiles after




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580932500



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses
+  uint64_t num_incoming_flow_files_queued_after = numIncomingFlowFilesQueued();
+
+  if (num_incoming_flow_files_queued_before != 0 && num_incoming_flow_files_queued_before == num_incoming_flow_files_queued_after) {
+    logger_->log_info("Processor %s processed no incoming flow files (out of %" PRIu64 "), so it will yield", name_, num_incoming_flow_files_queued_before);
+    yield();
+  }
+}
+
+void Processor::callOnTrigger(ProcessContext* context, ProcessSession* session) {

Review comment:
       They call different `onTrigger()` overloads.  But I will need a different approach in any case, so this is moot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Do not trigger the processor if the incoming queue has penalized flow files only

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r589434954



##########
File path: libminifi/src/Connection.cpp
##########
@@ -227,8 +212,7 @@ void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);

Review comment:
       Might be not related to this PR, but are we happy just locking here? Can we not resume a blocked insertion after the queue is supposedly drained?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580382072



##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -614,3 +614,109 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
 TEST_CASE("TestRPGValid", "[TestRPG6]") {
   testRPGBypass("", "8080", "8080", false);
 }
+
+class TestProcessorProcessIncomingQueue : public minifi::core::Processor {
+ public:
+  explicit TestProcessorProcessIncomingQueue(std::string name, utils::Identifier uuid = {}) : Processor(name, uuid) {
+    setSupportedProperties({NumFlowFiles});
+  }
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override {
+    context->getProperty(NumFlowFiles.getName(), num_flow_files_);
+  }
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>& session) override {
+    for (uint32_t i = 0; i < num_flow_files_; ++i) {
+      auto flow_file = session->get();
+      if (flow_file) {
+        session->remove(flow_file);
+      }
+    }
+  }
+
+  static core::Property NumFlowFiles;
+  uint32_t num_flow_files_ = 0;
+};
+
+core::Property TestProcessorProcessIncomingQueue::NumFlowFiles = core::PropertyBuilder::createProperty("Number of Flow Files to Process")
+    ->withDefaultValue<uint32_t>(0)
+    ->build();
+
+REGISTER_RESOURCE(TestProcessorProcessIncomingQueue, "A mock processor that processes a configurable number of incoming flow files");
+
+bool testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(uint32_t num_incoming_, uint32_t num_processed_) {
+  LogTestController::getInstance().setDebug<core::Processor>();
+  LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+
+  const auto repo = std::make_shared<TestRepository>();
+  const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
+
+  const std::shared_ptr<core::Processor> generate_flow_file = std::make_shared<processors::GenerateFlowFile>("generate_flow_file");
+  const auto generate_flow_file_node = std::make_shared<core::ProcessorNode>(generate_flow_file);
+  generate_flow_file->initialize();
+  const auto gff_context = std::make_shared<core::ProcessContext>(generate_flow_file_node, nullptr, repo, repo, content_repo);
+  gff_context->setProperty(processors::GenerateFlowFile::BatchSize, std::to_string(num_incoming_));
+
+  const std::shared_ptr<core::Processor> process_incoming_queue = std::make_shared<TestProcessorProcessIncomingQueue>("process_incoming_queue");
+  const auto process_incoming_queue_node = std::make_shared<core::ProcessorNode>(process_incoming_queue);
+  process_incoming_queue->initialize();
+  const auto piq_context = std::make_shared<core::ProcessContext>(process_incoming_queue_node, nullptr, repo, repo, content_repo);
+  piq_context->setProperty(TestProcessorProcessIncomingQueue::NumFlowFiles, std::to_string(num_processed_));
+
+  const auto connection = std::make_shared<minifi::Connection>(repo, content_repo, "ggf_to_piq");
+  connection->addRelationship(core::Relationship{"success", ""});
+  connection->setSourceUUID(generate_flow_file->getUUID());
+  connection->setDestinationUUID(process_incoming_queue->getUUID());
+  generate_flow_file->addConnection(connection);
+  process_incoming_queue->addConnection(connection);
+
+  const auto gff_session_factory = std::make_shared<core::ProcessSessionFactory>(gff_context);
+  generate_flow_file->setScheduledState(core::ScheduledState::RUNNING);
+  generate_flow_file->onSchedule(gff_context, gff_session_factory);
+  generate_flow_file->onTrigger(gff_context, gff_session_factory);
+
+  const auto piq_session_factory = std::make_shared<core::ProcessSessionFactory>(piq_context);
+  process_incoming_queue->setScheduledState(core::ScheduledState::RUNNING);
+  process_incoming_queue->onSchedule(piq_context, piq_session_factory);
+  process_incoming_queue->onTrigger(piq_context, piq_session_factory);
+
+  return process_incoming_queue->isYield();
+}
+
+TEST_CASE("If there are no incoming flow files, then there is no automatic yield", "[AutomaticYield]") {
+  SECTION("0 flow files in the queue, we don't try to process any") {
+    auto is_yield = testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(0, 0);
+    REQUIRE_FALSE(is_yield);
+  }
+  SECTION("0 flow files in the queue, we try to process one") {

Review comment:
       This description is a bit misleading as we don't call `Processession::remove` if there is nothing to process. It might be also worth updating this function not to take `nullptr` as its arg.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #1013: MINIFICPP-1487 Always yield if no input flow files were processed

Posted by GitBox <gi...@apache.org>.
hunyadi-dev commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580368063



##########
File path: libminifi/src/core/Processor.cpp
##########
@@ -277,6 +286,28 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const
   }
 }
 
+void Processor::callOnTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) {
+  uint64_t num_incoming_flow_files_queued_before = numIncomingFlowFilesQueued();
+  onTrigger(context, session);  // virtual method overridden by subclasses
+  uint64_t num_incoming_flow_files_queued_after = numIncomingFlowFilesQueued();
+
+  if (num_incoming_flow_files_queued_before != 0 && num_incoming_flow_files_queued_before == num_incoming_flow_files_queued_after) {
+    logger_->log_info("Processor %s processed no incoming flow files (out of %" PRIu64 "), so it will yield", name_, num_incoming_flow_files_queued_before);
+    yield();
+  }
+}
+
+void Processor::callOnTrigger(ProcessContext* context, ProcessSession* session) {

Review comment:
       Is this different from the other overload? Can the other one not just call to this one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org