You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2019/07/02 08:47:39 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-937: resolve
state file issues and rollover issues. Add test coverage
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new acc6da8 MINIFICPP-937: resolve state file issues and rollover issues. Add test coverage
acc6da8 is described below
commit acc6da8c64ec3a5048119eb16630ab5a6206cdab
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Thu Jun 27 15:20:21 2019 -0400
MINIFICPP-937: resolve state file issues and rollover issues. Add test coverage
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #601
---
.../standard-processors/processors/TailFile.cpp | 44 ++-
.../tests/unit/TailFileTests.cpp | 384 +++++++++++++--------
libminifi/test/TestBase.cpp | 4 +-
libminifi/test/TestBase.h | 2 +-
4 files changed, 284 insertions(+), 150 deletions(-)
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index c282df2..e6f1824 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -107,6 +107,10 @@ void TailFile::initialize() {
void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
+ // can perform these in notifyStop, but this has the same outcome
+ tail_states_.clear();
+ state_recovered_ = false;
+
std::string value;
if (context->getProperty(Delimiter.getName(), value)) {
@@ -176,6 +180,8 @@ std::string TailFile::trimRight(const std::string& s) {
void TailFile::parseStateFileLine(char *buf) {
char *line = buf;
+ logger_->log_trace("Received line %s", buf);
+
while ((line[0] == ' ') || (line[0] == '\t'))
++line;
@@ -208,9 +214,10 @@ void TailFile::parseStateFileLine(char *buf) {
if (key == "FILENAME") {
std::string fileLocation, fileName;
if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
- tail_states_.insert(std::make_pair(value, TailState { fileLocation, fileName, 0, 0 }));
+ logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+ tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
} else {
- throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+ tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 }));
}
}
if (key == "POSITION") {
@@ -218,10 +225,12 @@ void TailFile::parseStateFileLine(char *buf) {
if (tail_states_.size() != 1) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
}
- tail_states_.begin()->second.currentTailFilePosition_ = std::stoi(value);
+ const auto position = std::stoi(value);
+ logger_->log_debug("Received position %d", position);
+ tail_states_.begin()->second.currentTailFilePosition_ = position;
}
if (key.find(CURRENT_STR) == 0) {
- const auto file = key.substr(strlen(CURRENT_STR) + 1);
+ const auto file = key.substr(strlen(CURRENT_STR));
std::string fileLocation, fileName;
if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
tail_states_[file].path_ = fileLocation;
@@ -231,8 +240,8 @@ void TailFile::parseStateFileLine(char *buf) {
}
}
- if (key.find("POSITION.") == 0) {
- const auto file = key.substr(strlen(POSITION_STR) + 1);
+ if (key.find(POSITION_STR) == 0) {
+ const auto file = key.substr(strlen(POSITION_STR));
tail_states_[file].currentTailFilePosition_ = std::stoi(value);
}
@@ -250,6 +259,24 @@ bool TailFile::recoverState() {
for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
parseStateFileLine(buf);
}
+
+ /**
+ * recover times and validate that we have paths
+ */
+
+ for (auto &state : tail_states_) {
+ std::string fileLocation, fileName;
+ if (!utils::file::PathUtils::getFileNameAndPath(state.second.current_file_name_, fileLocation, fileName) && state.second.path_.empty()) {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file does not contain a full path and file name");
+ }
+ struct stat sb;
+ const auto fileFullName = state.second.path_ + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
+ if (stat(fileFullName.c_str(), &sb) == 0) {
+ state.second.currentTailFileModificationTime_ = ((uint64_t) (sb.st_mtime) * 1000);
+ }
+ }
+
+ logger_->log_debug("load state file succeeded for %s", state_file_);
return true;
}
@@ -290,6 +317,8 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) {
uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
if (candidateModTime >= file.currentTailFileModificationTime_) {
+ logger_->log_trace("File %s ( short name %s ), disk mod time %llu, struct mod timer %llu , size on disk %llu, position %llu",
+ filename, file.current_file_name_, candidateModTime, file.currentTailFileModificationTime_, sb.st_size, file.currentTailFilePosition_);
if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ &&
sb.st_size == file.currentTailFilePosition_) {
return true; // Skip the current file as a candidate in case it wasn't updated
@@ -316,6 +345,7 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
// Going ahead in the file rolled over
if (file.current_file_name_ != base_file_name) {
+ logger_->log_debug("Resetting posotion since %s != %s", base_file_name, file.current_file_name_);
file.currentTailFilePosition_ = 0;
}
@@ -349,7 +379,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
struct stat statbuf;
- logger_->log_debug("Tailing file %s", fullPath);
+ logger_->log_debug("Tailing file %s from %llu", fullPath, state.second.currentTailFilePosition_);
if (stat(fullPath.c_str(), &statbuf) == 0) {
if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) {
logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_);
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index e8c3269..71a11bc 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -58,7 +58,7 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
-
+ auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
@@ -78,7 +78,245 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
// Delete the test and state file.
remove(TMP_FILE);
- remove(STATE_FILE);
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
+
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+ plan->reset(true); // start a new but with state file
+
+ std::ofstream appendStream;
+ appendStream.open(TMP_FILE, std::ios_base::app);
+ appendStream << std::endl;
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("position 14"));
+
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+
+ LogTestController::getInstance().reset();
+
+ // Delete the test and state file.
+ remove(TMP_FILE);
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
+
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+ plan->reset(true); // start a new but with state file
+
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("position 0"));
+
+ // if we lose state we restart
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+ // Delete the test and state file.
+ remove(TMP_FILE);
+}
+
+TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE << std::endl;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
+
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+ // should stay the same
+ for (int i = 0; i < 5; i++) {
+ plan->reset(true); // start a new but with state file
+
+ auto statefile = std::string(STATE_FILE) + "." + id;
+
+ remove(statefile.c_str());
+
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+ newstatefile << "POSITION=14" << std::endl;
+ newstatefile.close();
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("position 14"));
+
+ // if we lose state we restart
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
+ }
+ for (int i = 14; i < 34; i++) {
+ plan->reset(true); // start a new but with state file
+
+ auto statefile = std::string(STATE_FILE) + "." + id;
+
+ remove(statefile.c_str());
+
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+ newstatefile << "POSITION=" << i << std::endl;
+ newstatefile.close();
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("position " + std::to_string(i)));
+ }
+
+ plan->runCurrentProcessor();
+ for (int i = 14; i < 34; i++) {
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
+ }
+ // Delete the test and state file.
+ remove(TMP_FILE);
+
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+}
+
+TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
+
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
+
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+
+ plan->reset(true); // start a new but with state file
+
+ auto statefile = std::string(STATE_FILE) + "." + id;
+
+ remove(statefile.c_str());
+
+ SECTION("No Filename") {
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "POSITION=14" << std::endl;
+ newstatefile.close();
+ REQUIRE_THROWS(testController.runSession(plan, true));
+}
+
+ SECTION("Invalid current filename") {
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
+ newstatefile << "CURRENT.minifi-tempfile.txt=minifi-tmpfile.txt" << std::endl;
+ newstatefile << "POSITION=14" << std::endl;
+ newstatefile.close();
+ REQUIRE_THROWS(testController.runSession(plan, true));
+}
+ SECTION("No current filename and partial path") {
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
+ newstatefile << "POSITION=14" << std::endl;
+ newstatefile.close();
+ REQUIRE_THROWS(testController.runSession(plan, true));
+}
+
+// Delete the test and state file.
+ remove(TMP_FILE);
+
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
@@ -89,6 +327,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
@@ -124,7 +363,7 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
LogTestController::getInstance().reset();
// Delete the test and state file.
- remove(STATE_FILE);
+ remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
@@ -133,6 +372,7 @@ TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
@@ -293,141 +533,3 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
}
-/*
- TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
-
- TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
- LogTestController::getInstance().setDebug<core::ProcessSession>();
- LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
-
- connection->setSourceUUID(processoruuid);
-
- processor->addConnection(connection);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
-
- core::ProcessSession session(&context);
-
- REQUIRE(processor->getName() == "tailfile");
-
- core::ProcessSessionFactory factory(&context);
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
-
- LogTestController::getInstance().reset();
- } catch (...) {
- }
-
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
- }
-
-
- TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
-
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
-
- processor->addConnection(connection);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
-
- core::ProcessSession session(&context);
-
- REQUIRE(processor->getName() == "tailfile");
-
- core::ProcessSessionFactory factory(&context);
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 2);
-
- LogTestController::getInstance().reset();
- } catch (...) {
- }
-
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
- }
- */
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index f496b3f..be42952 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -149,11 +149,13 @@ bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const st
}
}
-void TestPlan::reset() {
+void TestPlan::reset(bool reschedule) {
std::lock_guard<std::recursive_mutex> guard(mutex);
process_sessions_.clear();
factories_.clear();
location = -1;
+ if (reschedule)
+ configured_processors_.clear();
for (auto proc : processor_queue_) {
while (proc->getActiveTasks() > 0) {
proc->decrementActiveTask();
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 21a2f43..71013c6 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -192,7 +192,7 @@ class TestPlan {
bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
- void reset();
+ void reset(bool reschedule = false);
bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);