You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/02 08:47:39 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-937: resolve state file issues and rollover issues. Add test coverage

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new acc6da8  MINIFICPP-937: resolve state file issues and rollover issues. Add test coverage
acc6da8 is described below

commit acc6da8c64ec3a5048119eb16630ab5a6206cdab
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Thu Jun 27 15:20:21 2019 -0400

    MINIFICPP-937: resolve state file issues and rollover issues. Add test coverage
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #601
---
 .../standard-processors/processors/TailFile.cpp    |  44 ++-
 .../tests/unit/TailFileTests.cpp                   | 384 +++++++++++++--------
 libminifi/test/TestBase.cpp                        |   4 +-
 libminifi/test/TestBase.h                          |   2 +-
 4 files changed, 284 insertions(+), 150 deletions(-)

diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index c282df2..e6f1824 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -107,6 +107,10 @@ void TailFile::initialize() {
 void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
 
+  // can perform these in notifyStop, but this has the same outcome
+  tail_states_.clear();
+  state_recovered_ = false;
+
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
@@ -176,6 +180,8 @@ std::string TailFile::trimRight(const std::string& s) {
 void TailFile::parseStateFileLine(char *buf) {
   char *line = buf;
 
+  logger_->log_trace("Received line %s", buf);
+
   while ((line[0] == ' ') || (line[0] == '\t'))
     ++line;
 
@@ -208,9 +214,10 @@ void TailFile::parseStateFileLine(char *buf) {
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
-      tail_states_.insert(std::make_pair(value, TailState { fileLocation, fileName, 0, 0 }));
+      logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
     } else {
-      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+      tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 }));
     }
   }
   if (key == "POSITION") {
@@ -218,10 +225,12 @@ void TailFile::parseStateFileLine(char *buf) {
     if (tail_states_.size() != 1) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
     }
-    tail_states_.begin()->second.currentTailFilePosition_ = std::stoi(value);
+    const auto position = std::stoi(value);
+    logger_->log_debug("Received position %d", position);
+    tail_states_.begin()->second.currentTailFilePosition_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
-    const auto file = key.substr(strlen(CURRENT_STR) + 1);
+    const auto file = key.substr(strlen(CURRENT_STR));
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
       tail_states_[file].path_ = fileLocation;
@@ -231,8 +240,8 @@ void TailFile::parseStateFileLine(char *buf) {
     }
   }
 
-  if (key.find("POSITION.") == 0) {
-    const auto file = key.substr(strlen(POSITION_STR) + 1);
+  if (key.find(POSITION_STR) == 0) {
+    const auto file = key.substr(strlen(POSITION_STR));
     tail_states_[file].currentTailFilePosition_ = std::stoi(value);
   }
 
@@ -250,6 +259,24 @@ bool TailFile::recoverState() {
   for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
     parseStateFileLine(buf);
   }
+
+  /**
+   * recover times and validate that we have paths
+   */
+
+  for (auto &state : tail_states_) {
+    std::string fileLocation, fileName;
+    if (!utils::file::PathUtils::getFileNameAndPath(state.second.current_file_name_, fileLocation, fileName) && state.second.path_.empty()) {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file does not contain a full path and file name");
+    }
+    struct stat sb;
+    const auto fileFullName = state.second.path_ + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
+    if (stat(fileFullName.c_str(), &sb) == 0) {
+      state.second.currentTailFileModificationTime_ = ((uint64_t) (sb.st_mtime) * 1000);
+    }
+  }
+
+  logger_->log_debug("load state file succeeded for %s", state_file_);
   return true;
 }
 
@@ -290,6 +317,8 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
       if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) {
         uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
         if (candidateModTime >= file.currentTailFileModificationTime_) {
+          logger_->log_trace("File %s ( short name %s ), disk mod time %llu, struct mod timer %llu , size on disk %llu, position %llu",
+              filename, file.current_file_name_, candidateModTime, file.currentTailFileModificationTime_, sb.st_size, file.currentTailFilePosition_);
           if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ &&
               sb.st_size == file.currentTailFilePosition_) {
             return true;  // Skip the current file as a candidate in case it wasn't updated
@@ -316,6 +345,7 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
 
     // Going ahead in the file rolled over
     if (file.current_file_name_ != base_file_name) {
+      logger_->log_debug("Resetting posotion since %s != %s", base_file_name, file.current_file_name_);
       file.currentTailFilePosition_ = 0;
     }
 
@@ -349,7 +379,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
     std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
     struct stat statbuf;
 
-    logger_->log_debug("Tailing file %s", fullPath);
+    logger_->log_debug("Tailing file %s from %llu", fullPath, state.second.currentTailFilePosition_);
     if (stat(fullPath.c_str(), &statbuf) == 0) {
       if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) {
         logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_);
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index e8c3269..71a11bc 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -58,7 +58,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
-
+  auto id = tailfile->getUUIDStr();
   plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
 
   char format[] = "/tmp/gt.XXXXXX";
@@ -78,7 +78,245 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
 
   // Delete the test and state file.
   remove(TMP_FILE);
-  remove(STATE_FILE);
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << NEWLINE_FILE;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
+
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+  plan->reset(true);  // start a new but with state file
+
+  std::ofstream appendStream;
+  appendStream.open(TMP_FILE, std::ios_base::app);
+  appendStream << std::endl;
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("position 14"));
+
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+
+  LogTestController::getInstance().reset();
+
+  // Delete the test and state file.
+  remove(TMP_FILE);
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << NEWLINE_FILE;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
+
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+  plan->reset(true);  // start a new but with state file
+
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("position 0"));
+
+  // if we lose state we restart
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+  // Delete the test and state file.
+  remove(TMP_FILE);
+}
+
+TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << NEWLINE_FILE << std::endl;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
+
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+  // should stay the same
+  for (int i = 0; i < 5; i++) {
+    plan->reset(true);  // start a new but with state file
+
+    auto statefile = std::string(STATE_FILE) + "." + id;
+
+    remove(statefile.c_str());
+
+    std::ofstream newstatefile;
+    newstatefile.open(statefile);
+    newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+    newstatefile << "POSITION=14" << std::endl;
+    newstatefile.close();
+
+    testController.runSession(plan, true);
+
+    REQUIRE(LogTestController::getInstance().contains("position 14"));
+
+    // if we lose state we restart
+    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+  }
+  for (int i = 14; i < 34; i++) {
+    plan->reset(true);  // start a new but with state file
+
+    auto statefile = std::string(STATE_FILE) + "." + id;
+
+    remove(statefile.c_str());
+
+    std::ofstream newstatefile;
+    newstatefile.open(statefile);
+    newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+    newstatefile << "POSITION=" << i << std::endl;
+    newstatefile.close();
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("position " + std::to_string(i)));
+  }
+
+  plan->runCurrentProcessor();
+  for (int i = 14; i < 34; i++) {
+    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
+  }
+  // Delete the test and state file.
+  remove(TMP_FILE);
+
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
+  // Create and write to the test file
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << NEWLINE_FILE;
+  tmpfile.close();
+
+  TestController testController;
+  LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+  LogTestController::getInstance().setDebug<core::ProcessSession>();
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
+
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+  plan->reset(true);  // start a new but with state file
+
+  auto statefile = std::string(STATE_FILE) + "." + id;
+
+  remove(statefile.c_str());
+
+  SECTION("No Filename") {
+  std::ofstream newstatefile;
+  newstatefile.open(statefile);
+  newstatefile << "POSITION=14" << std::endl;
+  newstatefile.close();
+  REQUIRE_THROWS(testController.runSession(plan, true));
+}
+
+  SECTION("Invalid current filename") {
+  std::ofstream newstatefile;
+  newstatefile.open(statefile);
+  newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
+  newstatefile << "CURRENT.minifi-tempfile.txt=minifi-tmpfile.txt" << std::endl;
+  newstatefile << "POSITION=14" << std::endl;
+  newstatefile.close();
+  REQUIRE_THROWS(testController.runSession(plan, true));
+}
+  SECTION("No current filename and partial path") {
+  std::ofstream newstatefile;
+  newstatefile.open(statefile);
+  newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
+  newstatefile << "POSITION=14" << std::endl;
+  newstatefile.close();
+  REQUIRE_THROWS(testController.runSession(plan, true));
+}
+
+// Delete the test and state file.
+  remove(TMP_FILE);
+
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
 }
 
 TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
@@ -89,6 +327,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
 
   plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
 
@@ -124,7 +363,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
-  remove(STATE_FILE);
+  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
 }
 
 TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
@@ -133,6 +372,7 @@ TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
   std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto id = tailfile->getUUIDStr();
 
   plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
 
@@ -293,141 +533,3 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
   REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
 }
 
-/*
- TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
-
- TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
- LogTestController::getInstance().setDebug<core::ProcessSession>();
- LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
-
- connection->setSourceUUID(processoruuid);
-
- processor->addConnection(connection);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
-
- core::ProcessSession session(&context);
-
- REQUIRE(processor->getName() == "tailfile");
-
- core::ProcessSessionFactory factory(&context);
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
-
- LogTestController::getInstance().reset();
- } catch (...) {
- }
-
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
- }
-
-
- TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
-
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
-
- processor->addConnection(connection);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
-
- core::ProcessSession session(&context);
-
- REQUIRE(processor->getName() == "tailfile");
-
- core::ProcessSessionFactory factory(&context);
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 2);
-
- LogTestController::getInstance().reset();
- } catch (...) {
- }
-
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
- }
- */
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index f496b3f..be42952 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -149,11 +149,13 @@ bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const st
   }
 }
 
-void TestPlan::reset() {
+void TestPlan::reset(bool reschedule) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   process_sessions_.clear();
   factories_.clear();
   location = -1;
+  if (reschedule)
+    configured_processors_.clear();
   for (auto proc : processor_queue_) {
     while (proc->getActiveTasks() > 0) {
       proc->decrementActiveTask();
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 21a2f43..71013c6 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -192,7 +192,7 @@ class TestPlan {
 
   bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
 
-  void reset();
+  void reset(bool reschedule = false);
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);