You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/08/26 22:40:35 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1339 - Do not
ignore errors, or run silent rollbacks.
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 104e019 MINIFICPP-1339 - Do not ignore errors, or run silent rollbacks.
104e019 is described below
commit 104e0197573791538605c23ec5ea53ef5ade0915
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Mon Aug 17 13:52:30 2020 +0200
MINIFICPP-1339 - Do not ignore errors, or run silent rollbacks.
Add wrapper that automatically reopens the rocksdb repo
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #877
---
.../rocksdb-repos/DatabaseContentRepository.cpp | 29 ++--
.../rocksdb-repos/DatabaseContentRepository.h | 4 +-
extensions/rocksdb-repos/FlowFileRepository.cpp | 68 ++++++----
extensions/rocksdb-repos/FlowFileRepository.h | 36 ++---
extensions/rocksdb-repos/RocksDatabase.cpp | 149 +++++++++++++++++++++
extensions/rocksdb-repos/RocksDatabase.h | 111 +++++++++++++++
extensions/rocksdb-repos/RocksDbStream.cpp | 13 +-
extensions/rocksdb-repos/RocksDbStream.h | 6 +-
.../RocksDbPersistableKeyValueStoreService.cpp | 51 +++++--
.../RocksDbPersistableKeyValueStoreService.h | 3 +-
.../tests/unit/ProcessorTests.cpp | 14 +-
libminifi/src/core/ProcessSession.cpp | 28 ++--
libminifi/src/core/ProcessSessionReadCallback.cpp | 7 +-
libminifi/src/io/FileStream.cpp | 4 +-
14 files changed, 424 insertions(+), 99 deletions(-)
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index bdcc3e5..62f63aa 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -21,6 +21,7 @@
#include <string>
#include "RocksDbStream.h"
#include "rocksdb/merge_operator.h"
+#include "utils/GeneralUtils.h"
namespace org {
namespace apache {
@@ -43,8 +44,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
options.merge_operator = std::make_shared<StringAppender>();
options.error_if_exists = false;
options.max_successive_merges = 0;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
- if (status.ok()) {
+ db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+ if (db_->open()) {
logger_->log_debug("NiFi Content DB Repository database open %s success", directory_);
is_valid_ = true;
} else {
@@ -55,10 +56,12 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
}
void DatabaseContentRepository::stop() {
if (db_) {
- db_->FlushWAL(true);
- delete db_;
- db_ = nullptr;
+ auto opendb = db_->open();
+ if (opendb) {
+ opendb->FlushWAL(true);
+ }
}
+ db_.reset();
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
@@ -67,7 +70,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shar
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
- return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true);
+ return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
@@ -75,13 +78,17 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::share
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
- return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false);
+ return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
}
bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
std::string value;
rocksdb::Status status;
- status = db_->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
+ status = opendb->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
if (status.ok()) {
logger_->log_debug("%s exists", streamId->getContentFullPath());
return true;
@@ -94,8 +101,12 @@ bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceCla
bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
if (nullptr == claim || !is_valid_ || !db_)
return false;
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
rocksdb::Status status;
- status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
+ status = opendb->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
if (status.ok()) {
logger_->log_debug("Deleted %s", claim->getContentFullPath());
return true;
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 50a6c8c..85e2f7d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -25,6 +25,8 @@
#include "core/ContentRepository.h"
#include "properties/Configure.h"
#include "core/logging/LoggerConfiguration.h"
+#include "RocksDatabase.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -117,7 +119,7 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
private:
bool is_valid_;
- rocksdb::DB* db_;
+ std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index d04671c..57d3ab6 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -37,6 +37,10 @@ namespace core {
namespace repository {
void FlowFileRepository::flush() {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return;
+ }
rocksdb::WriteBatch batch;
rocksdb::ReadOptions options;
@@ -54,7 +58,7 @@ void FlowFileRepository::flush() {
}
}
- auto multistatus = db_->MultiGet(options, keys, &values);
+ auto multistatus = opendb->MultiGet(options, keys, &values);
for(size_t i=0; i<keys.size() && i<values.size() && i<multistatus.size(); ++i) {
if(!multistatus[i].ok()) {
@@ -71,7 +75,7 @@ void FlowFileRepository::flush() {
batch.Delete(keys[i]);
}
- auto operation = [this, &batch]() { return db_->Write(rocksdb::WriteOptions(), &batch); };
+ auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
if (!ExecuteWithRetry(operation)) {
for (const auto& key: keystrings) {
@@ -91,14 +95,18 @@ void FlowFileRepository::flush() {
}
void FlowFileRepository::printStats() {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return;
+ }
std::string key_count;
- db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+ opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);
std::string table_readers;
- db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+ opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
std::string all_memtables;
- db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+ opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s",
key_count, table_readers, all_memtables);
@@ -122,26 +130,27 @@ void FlowFileRepository::run() {
}
void FlowFileRepository::prune_stored_flowfiles() {
- rocksdb::DB* used_database;
- std::unique_ptr<rocksdb::DB> stored_database;
- bool corrupt_checkpoint = false;
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ options.use_direct_io_for_flush_and_compaction = true;
+ options.use_direct_reads = true;
+ minifi::internal::RocksDatabase checkpointDB(options, FLOWFILE_CHECKPOINT_DIRECTORY, minifi::internal::RocksDatabase::Mode::ReadOnly);
+ utils::optional<minifi::internal::OpenRocksDB> opendb;
if (nullptr != checkpoint_) {
- rocksdb::Options options;
- options.create_if_missing = true;
- options.use_direct_io_for_flush_and_compaction = true;
- options.use_direct_reads = true;
- rocksdb::Status status = rocksdb::DB::OpenForReadOnly(options, FLOWFILE_CHECKPOINT_DIRECTORY, &used_database);
- if (status.ok()) {
- stored_database.reset(used_database);
- } else {
- used_database = db_;
+ opendb = checkpointDB.open();
+ if (!opendb) {
+ opendb = db_->open();
+ }
+ if (!opendb) {
+ logger_->log_trace("Could not open neither the checkpoint nor the live database.");
+ return;
}
} else {
logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error.");
return;
}
- rocksdb::Iterator* it = used_database->NewIterator(rocksdb::ReadOptions());
+ auto it = opendb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
std::string key = it->key().ToString();
@@ -155,7 +164,7 @@ void FlowFileRepository::prune_stored_flowfiles() {
search = connectionMap.find(eventRead->getConnectionUuid());
found = (search != connectionMap.end());
}
- if (!corrupt_checkpoint && found) {
+ if (found) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
eventRead->setStoredToRepository(true);
@@ -171,8 +180,6 @@ void FlowFileRepository::prune_stored_flowfiles() {
keys_to_delete.enqueue(key);
}
}
-
- delete it;
}
bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> operation) {
@@ -194,22 +201,25 @@ bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> opera
* Returns True if there is data to interrogate.
* @return true if our db has data stored.
*/
-bool FlowFileRepository::need_checkpoint(){
- std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- return true;
- }
- return false;
+bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDB& opendb){
+ auto it = opendb.NewIterator(rocksdb::ReadOptions());
+ it->SeekToFirst();
+ return it->Valid();
}
void FlowFileRepository::initialize_repository() {
+ auto opendb = db_->open();
+ if (!opendb) {
+ logger_->log_trace("Couldn't open database, no way to checkpoint");
+ return;
+ }
// first we need to establish a checkpoint iff it is needed.
- if (!need_checkpoint()){
+ if (!need_checkpoint(*opendb)){
logger_->log_trace("Do not need checkpoint");
return;
}
rocksdb::Checkpoint *checkpoint;
// delete any previous copy
- if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 && rocksdb::Checkpoint::Create(db_, &checkpoint).ok()) {
+ if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 && opendb->NewCheckpoint(&checkpoint).ok()) {
if (checkpoint->CreateCheckpoint(FLOWFILE_CHECKPOINT_DIRECTORY).ok()) {
checkpoint_ = std::unique_ptr<rocksdb::Checkpoint>(checkpoint);
logger_->log_trace("Created checkpoint directory");
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 9f0650b..20581d4 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -28,6 +28,7 @@
#include "Connection.h"
#include "core/logging/LoggerConfiguration.h"
#include "concurrentqueue.h"
+#include "RocksDatabase.h"
namespace org {
namespace apache {
@@ -70,12 +71,6 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
db_ = NULL;
}
- // Destructor
- ~FlowFileRepository() {
- if (db_)
- delete db_;
- }
-
virtual bool isNoop() {
return false;
}
@@ -116,25 +111,34 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
options.write_buffer_size = 8 << 20;
options.max_write_buffer_number = 20;
options.min_write_buffer_number_to_merge = 1;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
- if (status.ok()) {
+ db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+ if (db_->open()) {
logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
+ return true;
} else {
logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
+ return false;
}
- return status.ok();
}
virtual void run();
virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
// persistent to the DB
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
rocksdb::Slice value((const char *) buf, bufLen);
- auto operation = [this, &key, &value]() { return db_->Put(rocksdb::WriteOptions(), key, value); };
+ auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); };
return ExecuteWithRetry(operation);
}
virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
rocksdb::WriteBatch batch;
for (const auto &item: data) {
rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
@@ -143,7 +147,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
return false;
}
}
- auto operation = [this, &batch]() { return db_->Write(rocksdb::WriteOptions(), &batch); };
+ auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
return ExecuteWithRetry(operation);
}
@@ -162,9 +166,11 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
* @return status of the get operation.
*/
virtual bool Get(const std::string &key, std::string &value) {
- if (db_ == nullptr)
+ auto opendb = db_->open();
+ if (!opendb) {
return false;
- return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
+ }
+ return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
}
virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
@@ -194,7 +200,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
* Returns true if a checkpoint is needed at startup
* @return true if a checkpoint is needed.
*/
- bool need_checkpoint();
+ bool need_checkpoint(minifi::internal::OpenRocksDB& opendb);
/**
* Prunes stored flow files.
@@ -203,7 +209,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
moodycamel::ConcurrentQueue<std::string> keys_to_delete;
std::shared_ptr<core::ContentRepository> content_repo_;
- rocksdb::DB* db_;
+ std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/rocksdb-repos/RocksDatabase.cpp b/extensions/rocksdb-repos/RocksDatabase.cpp
new file mode 100644
index 0000000..290b2c6
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDatabase.cpp
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDatabase.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace internal {
+
+OpenRocksDB::OpenRocksDB(RocksDatabase& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl) : db_(&db), impl_(std::move(impl)) {}
+
+rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
+ rocksdb::Status result = impl_->Put(options, key, value);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) {
+ rocksdb::Status result = impl_->Get(options, key, value);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+std::vector<rocksdb::Status> OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values) {
+ std::vector<rocksdb::Status> results = impl_->MultiGet(options, keys, values);
+ for (const auto& result : results) {
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ break;
+ }
+ }
+ return results;
+}
+
+rocksdb::Status OpenRocksDB::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates) {
+ rocksdb::Status result = impl_->Write(options, updates);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+rocksdb::Status OpenRocksDB::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) {
+ rocksdb::Status result = impl_->Delete(options, key);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+rocksdb::Status OpenRocksDB::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
+ rocksdb::Status result = impl_->Merge(options, key, value);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+bool OpenRocksDB::GetProperty(const rocksdb::Slice& property, std::string* value) {
+ return impl_->GetProperty(property, value);
+}
+
+std::unique_ptr<rocksdb::Iterator> OpenRocksDB::NewIterator(const rocksdb::ReadOptions& options) {
+ return std::unique_ptr<rocksdb::Iterator>{impl_->NewIterator(options)};
+}
+
+rocksdb::Status OpenRocksDB::NewCheckpoint(rocksdb::Checkpoint **checkpoint) {
+ return rocksdb::Checkpoint::Create(impl_.get(), checkpoint);
+}
+
+rocksdb::Status OpenRocksDB::FlushWAL(bool sync) {
+ rocksdb::Status result = impl_->FlushWAL(sync);
+ if (result == rocksdb::Status::NoSpace()) {
+ db_->invalidate();
+ }
+ return result;
+}
+
+rocksdb::DB* OpenRocksDB::get() {
+ return impl_.get();
+}
+
+std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = core::logging::LoggerFactory<RocksDatabase>::getLogger();
+
+RocksDatabase::RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode) : open_options_(options), db_name_(name), mode_(mode) {}
+
+void RocksDatabase::invalidate() {
+ std::lock_guard<std::mutex> db_guard{ mtx_ };
+ // discard our own instance
+ impl_.reset();
+}
+
+utils::optional<OpenRocksDB> RocksDatabase::open() {
+ std::lock_guard<std::mutex> db_guard{ mtx_ };
+ if (!impl_) {
+ // database is not opened yet
+ rocksdb::DB* db_instance = nullptr;
+ rocksdb::Status result;
+ switch (mode_) {
+ case Mode::ReadWrite:
+ result = rocksdb::DB::Open(open_options_, db_name_, &db_instance);
+ if (!result.ok()) {
+ logger_->log_error("Cannot open writable rocksdb database %s, error: %s", db_name_, result.ToString());
+ }
+ break;
+ case Mode::ReadOnly:
+ result = rocksdb::DB::OpenForReadOnly(open_options_, db_name_, &db_instance);
+ if (!result.ok()) {
+ logger_->log_error("Cannot open read-only rocksdb database %s, error: %s", db_name_, result.ToString());
+ }
+ break;
+ }
+ if (db_instance != nullptr && result.ok()) {
+ impl_.reset(db_instance);
+ } else {
+ // we failed to open the database
+ return utils::nullopt;
+ }
+ }
+ return OpenRocksDB(*this, gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(impl_));
+}
+
+} /* namespace internal */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/RocksDatabase.h b/extensions/rocksdb-repos/RocksDatabase.h
new file mode 100644
index 0000000..e67520b
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDatabase.h
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+
+#include "utils/OptionalUtils.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/checkpoint.h"
+#include "logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace internal {
+
+class RocksDatabase;
+
+// Not thread safe
+class OpenRocksDB {
+ friend class RocksDatabase;
+
+ OpenRocksDB(RocksDatabase& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl);
+
+ public:
+ OpenRocksDB(const OpenRocksDB&) = delete;
+ OpenRocksDB(OpenRocksDB&&) noexcept = default;
+ OpenRocksDB& operator=(const OpenRocksDB&) = delete;
+ OpenRocksDB& operator=(OpenRocksDB&&) = default;
+
+ rocksdb::Status Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value);
+
+ rocksdb::Status Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value);
+
+ std::vector<rocksdb::Status> MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values);
+
+ rocksdb::Status Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates);
+
+ rocksdb::Status Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key);
+
+ rocksdb::Status Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value);
+
+ bool GetProperty(const rocksdb::Slice& property, std::string* value);
+
+ std::unique_ptr<rocksdb::Iterator> NewIterator(const rocksdb::ReadOptions& options);
+
+ rocksdb::Status NewCheckpoint(rocksdb::Checkpoint** checkpoint);
+
+ rocksdb::Status FlushWAL(bool sync);
+
+ rocksdb::DB* get();
+
+ private:
+ gsl::not_null<RocksDatabase*> db_;
+ gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
+};
+
+class RocksDatabase {
+ friend class OpenRocksDB;
+
+ public:
+ enum class Mode {
+ ReadOnly,
+ ReadWrite
+ };
+
+ RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode = Mode::ReadWrite);
+
+ virtual utils::optional<OpenRocksDB> open();
+
+ private:
+ /*
+ * notify RocksDatabase that the next open should check if they can reopen the database
+ * until a successful reopen no more open is possible
+ */
+ void invalidate();
+
+ const rocksdb::Options open_options_;
+ const std::string db_name_;
+ const Mode mode_;
+
+ std::mutex mtx_;
+ std::shared_ptr<rocksdb::DB> impl_;
+
+ static std::shared_ptr<core::logging::Logger> logger_;
+};
+
+} /* namespace internal */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index a6526ec..decb030 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -30,13 +30,14 @@ namespace nifi {
namespace minifi {
namespace io {
-RocksDbStream::RocksDbStream(std::string path, rocksdb::DB *db, bool write_enable)
+RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable)
: BaseStream(),
path_(std::move(path)),
write_enable_(write_enable),
- db_(db),
+ db_(std::move(db)),
logger_(logging::LoggerFactory<RocksDbStream>::getLogger()) {
- exists_ = db_->Get(rocksdb::ReadOptions(), path_, &value_).ok();
+ auto opendb = db_->open();
+ exists_ = opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
offset_ = 0;
size_ = value_.size();
}
@@ -63,12 +64,16 @@ int RocksDbStream::writeData(std::vector<uint8_t> &buf, int buflen) {
int RocksDbStream::writeData(uint8_t *value, int size) {
if (!IsNullOrEmpty(value) && write_enable_) {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return -1;
+ }
rocksdb::Slice slice_value((const char *) value, size);
rocksdb::Status status;
size_ += size;
rocksdb::WriteOptions opts;
opts.sync = true;
- db_->Merge(opts, path_, slice_value);
+ opendb->Merge(opts, path_, slice_value);
if (status.ok()) {
return 0;
} else {
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 57f5c4e..c62d27b 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -18,7 +18,7 @@
#ifndef LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
#define LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
-#include "rocksdb/db.h"
+#include "RocksDatabase.h"
#include <iostream>
#include <cstdint>
#include <string>
@@ -46,7 +46,7 @@ class RocksDbStream : public io::BaseStream {
* File Stream constructor that accepts an fstream shared pointer.
* It must already be initialized for read and write.
*/
- explicit RocksDbStream(std::string path, rocksdb::DB *db, bool write_enable = false);
+ explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false);
~RocksDbStream() override {
closeStream();
@@ -160,7 +160,7 @@ class RocksDbStream : public io::BaseStream {
std::string value_;
- rocksdb::DB *db_;
+ gsl::not_null<minifi::internal::RocksDatabase*> db_;
size_t size_;
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
index ef04230..120f8e3 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -75,13 +75,12 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
if (!always_persist_) {
options.manual_wal_flush = true;
}
- rocksdb::DB* db = nullptr;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
- if (status.ok()) {
- db_.reset(db);
+ db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+ if (db_->open()) {
logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str());
} else {
- logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+ // TODO(adebreceni) forward the status
+ logger_->log_error("Failed to open RocksDB database at %s, error", directory_.c_str());
return;
}
@@ -102,7 +101,11 @@ bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const s
if (!db_) {
return false;
}
- rocksdb::Status status = db_->Put(default_write_options, key, value);
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ rocksdb::Status status = opendb->Put(default_write_options, key, value);
if (!status.ok()) {
logger_->log_error("Failed to Put key %s to RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
return false;
@@ -114,7 +117,11 @@ bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::st
if (!db_) {
return false;
}
- rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ rocksdb::Status status = opendb->Get(rocksdb::ReadOptions(), key, &value);
if (!status.ok()) {
logger_->log_error("Failed to Get key %s from RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
return false;
@@ -126,8 +133,12 @@ bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string,
if (!db_) {
return false;
}
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
kvs.clear();
- std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+ auto it = opendb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
kvs.emplace(it->key().ToString(), it->value().ToString());
}
@@ -142,7 +153,11 @@ bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
if (!db_) {
return false;
}
- rocksdb::Status status = db_->Delete(default_write_options, key);
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ rocksdb::Status status = opendb->Delete(default_write_options, key);
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
return false;
@@ -154,9 +169,13 @@ bool RocksDbPersistableKeyValueStoreService::clear() {
if (!db_) {
return false;
}
- std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ auto it = opendb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
- rocksdb::Status status = db_->Delete(default_write_options, it->key());
+ rocksdb::Status status = opendb->Delete(default_write_options, it->key());
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
return false;
@@ -173,6 +192,10 @@ bool RocksDbPersistableKeyValueStoreService::update(const std::string& key, cons
if (!db_) {
return false;
}
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
throw std::logic_error("Unsupported method");
}
@@ -180,10 +203,14 @@ bool RocksDbPersistableKeyValueStoreService::persist() {
if (!db_) {
return false;
}
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
if (always_persist_) {
return true;
}
- return db_->FlushWAL(true /*sync*/).ok();
+ return opendb->FlushWAL(true /*sync*/).ok();
}
} /* namespace controllers */
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
index 65ea554..95f6a1c 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
@@ -22,6 +22,7 @@
#include "properties/Configure.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
+#include "../RocksDatabase.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
@@ -69,7 +70,7 @@ class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyV
protected:
std::string directory_;
- std::unique_ptr<rocksdb::DB> db_;
+ std::unique_ptr<minifi::internal::RocksDatabase> db_;
rocksdb::WriteOptions default_write_options;
private:
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index e56e301..a251b4e 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -48,10 +48,15 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
REQUIRE(processor->getName() == "processorname");
}
+// TODO(adebreceni)
+// what is this test? multiple onTriggers with the same session
+// session->get() then no commit, same repo for flowFileRepo and provenance repo
TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+ auto config = std::make_shared<minifi::Configure>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(config);
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
processor->initialize();
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
@@ -89,8 +94,7 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
processor->onSchedule(context, factory);
- int prev = 0;
- for (int i = 0; i < 10; i++) {
+ for (int i = 1; i < 10; i++) {
auto session = std::make_shared<core::ProcessSession>(context);
REQUIRE(processor->getName() == "getfileCreate2");
@@ -126,9 +130,11 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
}
session->commit();
std::shared_ptr<core::FlowFile> ffr = session->get();
+ REQUIRE(ffr); // GetFile successfully read the contents and created a flowFile
- REQUIRE(repo->getRepoMap().size() == (prev + 1));
- prev++;
+ // one CREATE, one MODIFY, and one FF contents, as we use the same
+ // underlying repo for both provenance and flowFileRepo
+ REQUIRE(repo->getRepoMap().size() == 3 * i);
}
}
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index df67f62..c6de75a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -248,12 +248,10 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
}
if (callback->process(stream) < 0) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
flow->setSize(stream->getSize());
@@ -284,8 +282,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
uint64_t startTime = utils::timeutils::getTimeMillis();
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
if (nullptr == stream) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
}
// Call the callback to write the content
@@ -294,8 +291,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
if (oldPos > 0)
stream->seek(oldPos + 1);
if (callback->process(stream) < 0) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
flow->setSize(stream->getSize());
@@ -330,15 +326,13 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
if (nullptr == stream) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
}
stream->seek(flow->getOffset());
if (callback->process(stream) < 0) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
@@ -365,9 +359,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co
std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
if (nullptr == content_stream) {
- logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
}
size_t position = 0;
const size_t max_size = stream.getSize();
@@ -412,8 +404,7 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
if (nullptr == stream) {
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
}
if (input.is_open() && input.good()) {
bool invalidWrite = false;
@@ -532,8 +523,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
- rollback();
- return;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
}
if (stream->write(begin, len) != len) {
logger_->log_error("Error while writing");
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index b902009..254c1f5 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -54,7 +54,9 @@ int64_t ProcessSessionReadCallback::process(std::shared_ptr<io::BaseStream> stre
if (read == 0) {
break;
}
- _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
+ if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
+ return -1;
+ }
size += read;
} while (size < stream->getSize());
_writeSucceeded = true;
@@ -69,6 +71,9 @@ bool ProcessSessionReadCallback::commit() {
logger_->log_debug("committing export operation to %s", _destFile);
if (_writeSucceeded) {
+ if (!_tmpFileOs.flush()) {
+ return false;
+ }
_tmpFileOs.close();
if (rename(_tmpFile.c_str(), _destFile.c_str())) {
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 870e926..15cc112 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -102,7 +102,9 @@ int FileStream::writeData(uint8_t *value, int size) {
if (offset_ > length_) {
length_ = offset_;
}
- file_stream_->flush();
+ if (!file_stream_->flush()) {
+ return -1;
+ }
return size;
} else {
return -1;