You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/08/26 22:40:35 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1339 - Do not ignore errors, or run silent rollbacks.

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 104e019  MINIFICPP-1339 - Do not ignore errors, or run silent rollbacks.
104e019 is described below

commit 104e0197573791538605c23ec5ea53ef5ade0915
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Mon Aug 17 13:52:30 2020 +0200

    MINIFICPP-1339 - Do not ignore errors, or run silent rollbacks.
    
    Add wrapper that automatically reopens the rocksdb repo
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #877
---
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  29 ++--
 .../rocksdb-repos/DatabaseContentRepository.h      |   4 +-
 extensions/rocksdb-repos/FlowFileRepository.cpp    |  68 ++++++----
 extensions/rocksdb-repos/FlowFileRepository.h      |  36 ++---
 extensions/rocksdb-repos/RocksDatabase.cpp         | 149 +++++++++++++++++++++
 extensions/rocksdb-repos/RocksDatabase.h           | 111 +++++++++++++++
 extensions/rocksdb-repos/RocksDbStream.cpp         |  13 +-
 extensions/rocksdb-repos/RocksDbStream.h           |   6 +-
 .../RocksDbPersistableKeyValueStoreService.cpp     |  51 +++++--
 .../RocksDbPersistableKeyValueStoreService.h       |   3 +-
 .../tests/unit/ProcessorTests.cpp                  |  14 +-
 libminifi/src/core/ProcessSession.cpp              |  28 ++--
 libminifi/src/core/ProcessSessionReadCallback.cpp  |   7 +-
 libminifi/src/io/FileStream.cpp                    |   4 +-
 14 files changed, 424 insertions(+), 99 deletions(-)

diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index bdcc3e5..62f63aa 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -21,6 +21,7 @@
 #include <string>
 #include "RocksDbStream.h"
 #include "rocksdb/merge_operator.h"
+#include "utils/GeneralUtils.h"
 
 namespace org {
 namespace apache {
@@ -43,8 +44,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
   options.merge_operator = std::make_shared<StringAppender>();
   options.error_if_exists = false;
   options.max_successive_merges = 0;
-  rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
-  if (status.ok()) {
+  db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+  if (db_->open()) {
     logger_->log_debug("NiFi Content DB Repository database open %s success", directory_);
     is_valid_ = true;
   } else {
@@ -55,10 +56,12 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
 }
 void DatabaseContentRepository::stop() {
   if (db_) {
-    db_->FlushWAL(true);
-    delete db_;
-    db_ = nullptr;
+    auto opendb = db_->open();
+    if (opendb) {
+      opendb->FlushWAL(true);
+    }
   }
+  db_.reset();
 }
 
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
@@ -67,7 +70,7 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shar
   if (nullptr == claim || !is_valid_ || !db_)
     return nullptr;
   // append is already supported in all modes
-  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true);
+  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
 }
 
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
@@ -75,13 +78,17 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::share
   // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
   if (nullptr == claim || !is_valid_ || !db_)
     return nullptr;
-  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false);
+  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
 }
 
 bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
   std::string value;
   rocksdb::Status status;
-  status = db_->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
+  status = opendb->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
   if (status.ok()) {
     logger_->log_debug("%s exists", streamId->getContentFullPath());
     return true;
@@ -94,8 +101,12 @@ bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceCla
 bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
   if (nullptr == claim || !is_valid_ || !db_)
     return false;
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
   rocksdb::Status status;
-  status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
+  status = opendb->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
   if (status.ok()) {
     logger_->log_debug("Deleted %s", claim->getContentFullPath());
     return true;
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 50a6c8c..85e2f7d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -25,6 +25,8 @@
 #include "core/ContentRepository.h"
 #include "properties/Configure.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "RocksDatabase.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -117,7 +119,7 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
 
  private:
   bool is_valid_;
-  rocksdb::DB* db_;
+  std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index d04671c..57d3ab6 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -37,6 +37,10 @@ namespace core {
 namespace repository {
 
 void FlowFileRepository::flush() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return;
+  }
   rocksdb::WriteBatch batch;
   rocksdb::ReadOptions options;
 
@@ -54,7 +58,7 @@ void FlowFileRepository::flush() {
     }
   }
 
-  auto multistatus = db_->MultiGet(options, keys, &values);
+  auto multistatus = opendb->MultiGet(options, keys, &values);
 
   for(size_t i=0; i<keys.size() && i<values.size() && i<multistatus.size(); ++i) {
     if(!multistatus[i].ok()) {
@@ -71,7 +75,7 @@ void FlowFileRepository::flush() {
     batch.Delete(keys[i]);
   }
 
-  auto operation = [this, &batch]() { return db_->Write(rocksdb::WriteOptions(), &batch); };
+  auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
 
   if (!ExecuteWithRetry(operation)) {
     for (const auto& key: keystrings) {
@@ -91,14 +95,18 @@ void FlowFileRepository::flush() {
 }
 
 void FlowFileRepository::printStats() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return;
+  }
   std::string key_count;
-  db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+  opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);
 
   std::string table_readers;
-  db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+  opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
 
   std::string all_memtables;
-  db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+  opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
 
   logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s",
       key_count, table_readers, all_memtables);
@@ -122,26 +130,27 @@ void FlowFileRepository::run() {
 }
 
 void FlowFileRepository::prune_stored_flowfiles() {
-  rocksdb::DB* used_database;
-  std::unique_ptr<rocksdb::DB> stored_database;
-  bool corrupt_checkpoint = false;
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  options.use_direct_io_for_flush_and_compaction = true;
+  options.use_direct_reads = true;
+  minifi::internal::RocksDatabase checkpointDB(options, FLOWFILE_CHECKPOINT_DIRECTORY, minifi::internal::RocksDatabase::Mode::ReadOnly);
+  utils::optional<minifi::internal::OpenRocksDB> opendb;
   if (nullptr != checkpoint_) {
-    rocksdb::Options options;
-    options.create_if_missing = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-    options.use_direct_reads = true;
-    rocksdb::Status status = rocksdb::DB::OpenForReadOnly(options, FLOWFILE_CHECKPOINT_DIRECTORY, &used_database);
-    if (status.ok()) {
-      stored_database.reset(used_database);
-    } else {
-      used_database = db_;
+    opendb = checkpointDB.open();
+    if (!opendb) {
+      opendb = db_->open();
+    }
+    if (!opendb) {
+      logger_->log_trace("Could not open neither the checkpoint nor the live database.");
+      return;
     }
   } else {
     logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error.");
     return;
   }
 
-  rocksdb::Iterator* it = used_database->NewIterator(rocksdb::ReadOptions());
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
     std::string key = it->key().ToString();
@@ -155,7 +164,7 @@ void FlowFileRepository::prune_stored_flowfiles() {
         search = connectionMap.find(eventRead->getConnectionUuid());
         found = (search != connectionMap.end());
       }
-      if (!corrupt_checkpoint && found) {
+      if (found) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
         std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
         eventRead->setStoredToRepository(true);
@@ -171,8 +180,6 @@ void FlowFileRepository::prune_stored_flowfiles() {
       keys_to_delete.enqueue(key);
     }
   }
-
-  delete it;
 }
 
 bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> operation) {
@@ -194,22 +201,25 @@ bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> opera
  * Returns True if there is data to interrogate.
  * @return true if our db has data stored.
  */
-bool FlowFileRepository::need_checkpoint(){
-  std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    return true;
-  }
-  return false;
+bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDB& opendb){
+  auto it = opendb.NewIterator(rocksdb::ReadOptions());
+  it->SeekToFirst();
+  return it->Valid();
 }
 void FlowFileRepository::initialize_repository() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_trace("Couldn't open database, no way to checkpoint");
+    return;
+  }
   // first we need to establish a checkpoint iff it is needed.
-  if (!need_checkpoint()){
+  if (!need_checkpoint(*opendb)){
     logger_->log_trace("Do not need checkpoint");
     return;
   }
   rocksdb::Checkpoint *checkpoint;
   // delete any previous copy
-  if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 && rocksdb::Checkpoint::Create(db_, &checkpoint).ok()) {
+  if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 && opendb->NewCheckpoint(&checkpoint).ok()) {
     if (checkpoint->CreateCheckpoint(FLOWFILE_CHECKPOINT_DIRECTORY).ok()) {
       checkpoint_ = std::unique_ptr<rocksdb::Checkpoint>(checkpoint);
       logger_->log_trace("Created checkpoint directory");
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 9f0650b..20581d4 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -28,6 +28,7 @@
 #include "Connection.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "concurrentqueue.h"
+#include "RocksDatabase.h"
 
 namespace org {
 namespace apache {
@@ -70,12 +71,6 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
     db_ = NULL;
   }
 
-  // Destructor
-  ~FlowFileRepository() {
-    if (db_)
-      delete db_;
-  }
-
   virtual bool isNoop() {
     return false;
   }
@@ -116,25 +111,34 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
     options.write_buffer_size = 8 << 20;
     options.max_write_buffer_number = 20;
     options.min_write_buffer_number_to_merge = 1;
-    rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
-    if (status.ok()) {
+    db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+    if (db_->open()) {
       logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
+      return true;
     } else {
       logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
+      return false;
     }
-    return status.ok();
   }
 
   virtual void run();
 
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     // persistent to the DB
+    auto opendb = db_->open();
+    if (!opendb) {
+      return false;
+    }
     rocksdb::Slice value((const char *) buf, bufLen);
-    auto operation = [this, &key, &value]() { return db_->Put(rocksdb::WriteOptions(), key, value); };
+    auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); };
     return ExecuteWithRetry(operation);
   }
 
   virtual bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::DataStream>>>& data) {
+    auto opendb = db_->open();
+    if (!opendb) {
+      return false;
+    }
     rocksdb::WriteBatch batch;
     for (const auto &item: data) {
       rocksdb::Slice value((const char *) item.second->getBuffer(), item.second->getSize());
@@ -143,7 +147,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
         return false;
       }
     }
-    auto operation = [this, &batch]() { return db_->Write(rocksdb::WriteOptions(), &batch); };
+    auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
     return ExecuteWithRetry(operation);
   }
 
@@ -162,9 +166,11 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
    * @return status of the get operation.
    */
   virtual bool Get(const std::string &key, std::string &value) {
-    if (db_ == nullptr)
+    auto opendb = db_->open();
+    if (!opendb) {
       return false;
-    return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
+    }
+    return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
   }
 
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
@@ -194,7 +200,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
    * Returns true if a checkpoint is needed at startup
    * @return true if a checkpoint is needed.
    */
-  bool need_checkpoint();
+  bool need_checkpoint(minifi::internal::OpenRocksDB& opendb);
 
   /**
    * Prunes stored flow files.
@@ -203,7 +209,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
 
   moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::shared_ptr<core::ContentRepository> content_repo_;
-  rocksdb::DB* db_;
+  std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/extensions/rocksdb-repos/RocksDatabase.cpp b/extensions/rocksdb-repos/RocksDatabase.cpp
new file mode 100644
index 0000000..290b2c6
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDatabase.cpp
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDatabase.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace internal {
+
+OpenRocksDB::OpenRocksDB(RocksDatabase& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl) : db_(&db), impl_(std::move(impl)) {}
+
+rocksdb::Status OpenRocksDB::Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
+  rocksdb::Status result = impl_->Put(options, key, value);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+rocksdb::Status OpenRocksDB::Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value) {
+  rocksdb::Status result = impl_->Get(options, key, value);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+std::vector<rocksdb::Status> OpenRocksDB::MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values) {
+  std::vector<rocksdb::Status> results = impl_->MultiGet(options, keys, values);
+  for (const auto& result : results) {
+    if (result == rocksdb::Status::NoSpace()) {
+      db_->invalidate();
+      break;
+    }
+  }
+  return results;
+}
+
+rocksdb::Status OpenRocksDB::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates) {
+  rocksdb::Status result = impl_->Write(options, updates);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+rocksdb::Status OpenRocksDB::Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key) {
+  rocksdb::Status result = impl_->Delete(options, key);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+rocksdb::Status OpenRocksDB::Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value) {
+  rocksdb::Status result = impl_->Merge(options, key, value);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+bool OpenRocksDB::GetProperty(const rocksdb::Slice& property, std::string* value) {
+  return impl_->GetProperty(property, value);
+}
+
+std::unique_ptr<rocksdb::Iterator> OpenRocksDB::NewIterator(const rocksdb::ReadOptions& options) {
+  return std::unique_ptr<rocksdb::Iterator>{impl_->NewIterator(options)};
+}
+
+rocksdb::Status OpenRocksDB::NewCheckpoint(rocksdb::Checkpoint **checkpoint) {
+  return rocksdb::Checkpoint::Create(impl_.get(), checkpoint);
+}
+
+rocksdb::Status OpenRocksDB::FlushWAL(bool sync) {
+  rocksdb::Status result = impl_->FlushWAL(sync);
+  if (result == rocksdb::Status::NoSpace()) {
+    db_->invalidate();
+  }
+  return result;
+}
+
+rocksdb::DB* OpenRocksDB::get() {
+  return impl_.get();
+}
+
+std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = core::logging::LoggerFactory<RocksDatabase>::getLogger();
+
+RocksDatabase::RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode) : open_options_(options), db_name_(name), mode_(mode) {}
+
+void RocksDatabase::invalidate() {
+  std::lock_guard<std::mutex> db_guard{ mtx_ };
+  // discard our own instance
+  impl_.reset();
+}
+
+utils::optional<OpenRocksDB> RocksDatabase::open() {
+  std::lock_guard<std::mutex> db_guard{ mtx_ };
+  if (!impl_) {
+    // database is not opened yet
+    rocksdb::DB* db_instance = nullptr;
+    rocksdb::Status result;
+    switch (mode_) {
+      case Mode::ReadWrite:
+        result = rocksdb::DB::Open(open_options_, db_name_, &db_instance);
+        if (!result.ok()) {
+          logger_->log_error("Cannot open writable rocksdb database %s, error: %s", db_name_, result.ToString());
+        }
+        break;
+      case Mode::ReadOnly:
+        result = rocksdb::DB::OpenForReadOnly(open_options_, db_name_, &db_instance);
+        if (!result.ok()) {
+          logger_->log_error("Cannot open read-only rocksdb database %s, error: %s", db_name_, result.ToString());
+        }
+        break;
+    }
+    if (db_instance != nullptr && result.ok()) {
+      impl_.reset(db_instance);
+    } else {
+      // we failed to open the database
+      return utils::nullopt;
+    }
+  }
+  return OpenRocksDB(*this, gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(impl_));
+}
+
+} /* namespace internal */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/RocksDatabase.h b/extensions/rocksdb-repos/RocksDatabase.h
new file mode 100644
index 0000000..e67520b
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDatabase.h
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+
+#include "utils/OptionalUtils.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/checkpoint.h"
+#include "logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace internal {
+
+class RocksDatabase;
+
+// Not thread safe
+class OpenRocksDB {
+  friend class RocksDatabase;
+
+  OpenRocksDB(RocksDatabase& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl);
+
+ public:
+  OpenRocksDB(const OpenRocksDB&) = delete;
+  OpenRocksDB(OpenRocksDB&&) noexcept = default;
+  OpenRocksDB& operator=(const OpenRocksDB&) = delete;
+  OpenRocksDB& operator=(OpenRocksDB&&) = default;
+
+  rocksdb::Status Put(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value);
+
+  rocksdb::Status Get(const rocksdb::ReadOptions& options, const rocksdb::Slice& key, std::string* value);
+
+  std::vector<rocksdb::Status> MultiGet(const rocksdb::ReadOptions& options, const std::vector<rocksdb::Slice>& keys, std::vector<std::string>* values);
+
+  rocksdb::Status Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch* updates);
+
+  rocksdb::Status Delete(const rocksdb::WriteOptions& options, const rocksdb::Slice& key);
+
+  rocksdb::Status Merge(const rocksdb::WriteOptions& options, const rocksdb::Slice& key, const rocksdb::Slice& value);
+
+  bool GetProperty(const rocksdb::Slice& property, std::string* value);
+
+  std::unique_ptr<rocksdb::Iterator> NewIterator(const rocksdb::ReadOptions& options);
+
+  rocksdb::Status NewCheckpoint(rocksdb::Checkpoint** checkpoint);
+
+  rocksdb::Status FlushWAL(bool sync);
+
+  rocksdb::DB* get();
+
+ private:
+  gsl::not_null<RocksDatabase*> db_;
+  gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
+};
+
+class RocksDatabase {
+  friend class OpenRocksDB;
+
+ public:
+  enum class Mode {
+    ReadOnly,
+    ReadWrite
+  };
+
+  RocksDatabase(const rocksdb::Options& options, const std::string& name, Mode mode = Mode::ReadWrite);
+
+  virtual utils::optional<OpenRocksDB> open();
+
+ private:
+  /*
+   * notify RocksDatabase that the next open should check if they can reopen the database
+   * until a successful reopen no more open is possible
+   */
+  void invalidate();
+
+  const rocksdb::Options open_options_;
+  const std::string db_name_;
+  const Mode mode_;
+
+  std::mutex mtx_;
+  std::shared_ptr<rocksdb::DB> impl_;
+
+  static std::shared_ptr<core::logging::Logger> logger_;
+};
+
+} /* namespace internal */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index a6526ec..decb030 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -30,13 +30,14 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-RocksDbStream::RocksDbStream(std::string path, rocksdb::DB *db, bool write_enable)
+RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable)
     : BaseStream(),
       path_(std::move(path)),
       write_enable_(write_enable),
-      db_(db),
+      db_(std::move(db)),
       logger_(logging::LoggerFactory<RocksDbStream>::getLogger()) {
-  exists_ = db_->Get(rocksdb::ReadOptions(), path_, &value_).ok();
+  auto opendb = db_->open();
+  exists_ = opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
   offset_ = 0;
   size_ = value_.size();
 }
@@ -63,12 +64,16 @@ int RocksDbStream::writeData(std::vector<uint8_t> &buf, int buflen) {
 
 int RocksDbStream::writeData(uint8_t *value, int size) {
   if (!IsNullOrEmpty(value) && write_enable_) {
+    auto opendb = db_->open();
+    if (!opendb) {
+      return -1;
+    }
     rocksdb::Slice slice_value((const char *) value, size);
     rocksdb::Status status;
     size_ += size;
     rocksdb::WriteOptions opts;
     opts.sync = true;
-    db_->Merge(opts, path_, slice_value);
+    opendb->Merge(opts, path_, slice_value);
     if (status.ok()) {
       return 0;
     } else {
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 57f5c4e..c62d27b 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -18,7 +18,7 @@
 #ifndef LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
 #define LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_
 
-#include "rocksdb/db.h"
+#include "RocksDatabase.h"
 #include <iostream>
 #include <cstdint>
 #include <string>
@@ -46,7 +46,7 @@ class RocksDbStream : public io::BaseStream {
    * File Stream constructor that accepts an fstream shared pointer.
    * It must already be initialized for read and write.
    */
-  explicit RocksDbStream(std::string path, rocksdb::DB *db, bool write_enable = false);
+  explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false);
 
   ~RocksDbStream() override {
     closeStream();
@@ -160,7 +160,7 @@ class RocksDbStream : public io::BaseStream {
 
   std::string value_;
 
-  rocksdb::DB *db_;
+  gsl::not_null<minifi::internal::RocksDatabase*> db_;
 
   size_t size_;
 
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
index ef04230..120f8e3 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -75,13 +75,12 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
   if (!always_persist_) {
     options.manual_wal_flush = true;
   }
-  rocksdb::DB* db = nullptr;
-  rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
-  if (status.ok()) {
-    db_.reset(db);
+  db_ = utils::make_unique<minifi::internal::RocksDatabase>(options, directory_);
+  if (db_->open()) {
     logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str());
   } else {
-    logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+    // TODO(adebreceni) forward the status
+    logger_->log_error("Failed to open RocksDB database at %s, error", directory_.c_str());
     return;
   }
 
@@ -102,7 +101,11 @@ bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const s
   if (!db_) {
     return false;
   }
-  rocksdb::Status status = db_->Put(default_write_options, key, value);
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  rocksdb::Status status = opendb->Put(default_write_options, key, value);
   if (!status.ok()) {
     logger_->log_error("Failed to Put key %s to RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
     return false;
@@ -114,7 +117,11 @@ bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::st
   if (!db_) {
     return false;
   }
-  rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  rocksdb::Status status = opendb->Get(rocksdb::ReadOptions(), key, &value);
   if (!status.ok()) {
     logger_->log_error("Failed to Get key %s from RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
     return false;
@@ -126,8 +133,12 @@ bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string,
   if (!db_) {
     return false;
   }
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
   kvs.clear();
-  std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     kvs.emplace(it->key().ToString(), it->value().ToString());
   }
@@ -142,7 +153,11 @@ bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
   if (!db_) {
     return false;
   }
-  rocksdb::Status status = db_->Delete(default_write_options, key);
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  rocksdb::Status status = opendb->Delete(default_write_options, key);
   if (!status.ok()) {
     logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
     return false;
@@ -154,9 +169,13 @@ bool RocksDbPersistableKeyValueStoreService::clear() {
   if (!db_) {
     return false;
   }
-  std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    rocksdb::Status status = db_->Delete(default_write_options, it->key());
+    rocksdb::Status status = opendb->Delete(default_write_options, it->key());
     if (!status.ok()) {
       logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
       return false;
@@ -173,6 +192,10 @@ bool RocksDbPersistableKeyValueStoreService::update(const std::string& key, cons
   if (!db_) {
     return false;
   }
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
   throw std::logic_error("Unsupported method");
 }
 
@@ -180,10 +203,14 @@ bool RocksDbPersistableKeyValueStoreService::persist() {
   if (!db_) {
     return false;
   }
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
   if (always_persist_) {
     return true;
   }
-  return db_->FlushWAL(true /*sync*/).ok();
+  return opendb->FlushWAL(true /*sync*/).ok();
 }
 
 } /* namespace controllers */
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
index 65ea554..95f6a1c 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
@@ -22,6 +22,7 @@
 #include "properties/Configure.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "../RocksDatabase.h"
 
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
@@ -69,7 +70,7 @@ class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyV
  protected:
   std::string directory_;
 
-  std::unique_ptr<rocksdb::DB> db_;
+  std::unique_ptr<minifi::internal::RocksDatabase> db_;
   rocksdb::WriteOptions default_write_options;
 
  private:
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index e56e301..a251b4e 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -48,10 +48,15 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   REQUIRE(processor->getName() == "processorname");
 }
 
+// TODO(adebreceni)
+// what is this test? multiple onTriggers with the same session
+// session->get() then no commit, same repo for flowFileRepo and provenance repo
 TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
   TestController testController;
   LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
+  auto config = std::make_shared<minifi::Configure>();
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(config);
   std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
   processor->initialize();
   std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
@@ -89,8 +94,7 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
 
   processor->onSchedule(context, factory);
 
-  int prev = 0;
-  for (int i = 0; i < 10; i++) {
+  for (int i = 1; i < 10; i++) {
     auto session = std::make_shared<core::ProcessSession>(context);
     REQUIRE(processor->getName() == "getfileCreate2");
 
@@ -126,9 +130,11 @@ TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
     }
     session->commit();
     std::shared_ptr<core::FlowFile> ffr = session->get();
+    REQUIRE(ffr);  // GetFile successfully read the contents and created a flowFile
 
-    REQUIRE(repo->getRepoMap().size() == (prev + 1));
-    prev++;
+    // one CREATE, one MODIFY, and one FF contents, as we use the same
+    // underlying repo for both provenance and flowFileRepo
+    REQUIRE(repo->getRepoMap().size() == 3 * i);
   }
 }
 
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index df67f62..c6de75a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -248,12 +248,10 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     // Call the callback to write the content
     if (nullptr == stream) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
     }
     if (callback->process(stream) < 0) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
 
     flow->setSize(stream->getSize());
@@ -284,8 +282,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     uint64_t startTime = utils::timeutils::getTimeMillis();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
     if (nullptr == stream) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
     }
     // Call the callback to write the content
 
@@ -294,8 +291,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     if (oldPos > 0)
       stream->seek(oldPos + 1);
     if (callback->process(stream) < 0) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
     flow->setSize(stream->getSize());
 
@@ -330,15 +326,13 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
 
     if (nullptr == stream) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
     }
 
     stream->seek(flow->getOffset());
 
     if (callback->process(stream) < 0) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
@@ -365,9 +359,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co
     std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
 
     if (nullptr == content_stream) {
-      logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
     }
     size_t position = 0;
     const size_t max_size = stream.getSize();
@@ -412,8 +404,7 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     if (nullptr == stream) {
-      rollback();
-      return;
+      throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
     }
     if (input.is_open() && input.good()) {
       bool invalidWrite = false;
@@ -532,8 +523,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
         }
         if (stream == nullptr) {
           logger_->log_error("Stream is null");
-          rollback();
-          return;
+          throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for import");
         }
         if (stream->write(begin, len) != len) {
           logger_->log_error("Error while writing");
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index b902009..254c1f5 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -54,7 +54,9 @@ int64_t ProcessSessionReadCallback::process(std::shared_ptr<io::BaseStream> stre
     if (read == 0) {
       break;
     }
-    _tmpFileOs.write(reinterpret_cast<char*>(buffer), read);
+    if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
+      return -1;
+    }
     size += read;
   } while (size < stream->getSize());
   _writeSucceeded = true;
@@ -69,6 +71,9 @@ bool ProcessSessionReadCallback::commit() {
   logger_->log_debug("committing export operation to %s", _destFile);
 
   if (_writeSucceeded) {
+    if (!_tmpFileOs.flush()) {
+      return false;
+    }
     _tmpFileOs.close();
 
     if (rename(_tmpFile.c_str(), _destFile.c_str())) {
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 870e926..15cc112 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -102,7 +102,9 @@ int FileStream::writeData(uint8_t *value, int size) {
       if (offset_ > length_) {
         length_ = offset_;
       }
-      file_stream_->flush();
+      if (!file_stream_->flush()) {
+        return -1;
+      }
       return size;
     } else {
       return -1;