You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/09 16:54:35 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1249 - Restore
agent from persisted repo
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9a22c MINIFICPP-1249 - Restore agent from persisted repo
0f9a22c is described below
commit 0f9a22cb2734fff261963df0d04dd18374d136cd
Author: Adam Debreceni <ad...@192.168.0.205>
AuthorDate: Tue Jun 9 11:52:09 2020 +0200
MINIFICPP-1249 - Restore agent from persisted repo
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #804
---
.../standard-processors/processors/GetFile.h | 12 ++--
libminifi/include/Connection.h | 8 +--
libminifi/include/core/Connectable.h | 2 +-
libminifi/test/rocksdb-tests/RepoTests.cpp | 74 ++++++++++++++++++++++
4 files changed, 85 insertions(+), 11 deletions(-)
diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h
index 5e9b04a..837e14b 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -66,11 +66,11 @@ class GetFileMetrics : public state::response::ResponseNode {
virtual ~GetFileMetrics() {
}
- virtual std::string getName() const {
+ std::string getName() const override {
return core::Connectable::getName();
}
- virtual std::vector<state::response::SerializedResponseNode> serialize() {
+ std::vector<state::response::SerializedResponseNode> serialize() override {
std::vector<state::response::SerializedResponseNode> resp;
state::response::SerializedResponseNode iter;
@@ -143,23 +143,23 @@ class GetFile : public core::Processor, public state::response::MetricsNodeSourc
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
/**
* Execution trigger for the GetFile Processor
* @param context processor context
* @param session processor session reference.
*/
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
// Initialize, over write by NiFi GetFile
- virtual void initialize(void);
+ void initialize(void) override;
/**
* performs a listing on the directory.
* @param request get file request.
*/
void performListing(const GetFileRequest &request);
- int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector);
+ int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) override;
protected:
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index ed452e4..ed4349c 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -152,7 +152,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
uint64_t getQueueDataSize() {
return queued_data_size_;
}
- void put(std::shared_ptr<core::Connectable> flow) {
+ void put(std::shared_ptr<core::Connectable> flow) override {
std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow);
if (nullptr != ff) {
put(ff);
@@ -169,15 +169,15 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
// Drain the flow records
void drain();
- void yield() {
+ void yield() override {
}
- bool isWorkAvailable() {
+ bool isWorkAvailable() override {
return !isEmpty();
}
- bool isRunning() {
+ bool isRunning() override {
return true;
}
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 82a9878..485732d 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -74,7 +74,7 @@ class Connectable : public CoreComponent {
*/
std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
- void put(std::shared_ptr<Connectable> flow) {
+ virtual void put(std::shared_ptr<Connectable> flow) {
}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 34e0a39..e970c40 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -252,3 +252,77 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/var/tmp/testRepo.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ std::shared_ptr<minifi::Connection> input = std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+ auto root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+ root->addConnection(input);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ std::string data = "banana";
+ minifi::io::DataStream content(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+
+ /**
+ * Currently it is the Connection's responsibility to persist the incoming
+ * flowFiles to the FlowFileRepository. Upon restart the FlowFileRepository
+ * checks the persisted database and moves every FlowFile into the Connection
+ * that persisted it (if it can find it. We could have a different flow, in
+ * which case the orphan FlowFiles are deleted.)
+ */
+ {
+ std::shared_ptr<core::Processor> processor = std::make_shared<core::Processor>("dummy");
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(content, flow);
+ input->put(flow); // stores it in the flowFileRepository
+ }
+
+ // remove flow from the connection but it is still present in the
+ // flowFileRepo
+ std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+ auto oldFlow = input->poll(expiredFiles);
+ REQUIRE(oldFlow);
+ REQUIRE(expiredFiles.empty());
+
+ // this notifies the FlowFileRepository of the flow structure
+ // i.e. what Connections are present (more precisely what Connectables
+ // are present)
+ flowController->load(root);
+ // this will first check the persisted repo and restore all FlowFiles
+ // that still has an owner Connectable
+ ff_repository->start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+ // check if the @input Connection's FlowFile was restored
+ // upon the FlowFileRepository's startup
+ auto newFlow = input->poll(expiredFiles);
+ REQUIRE(newFlow);
+ REQUIRE(expiredFiles.empty());
+
+ LogTestController::getInstance().reset();
+}