You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/20 13:48:34 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 56302705f MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository
56302705f is described below

commit 56302705fdcf91d5e1f5249cddb55577baa53b80
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Thu Oct 20 15:32:15 2022 +0200

    MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository
    
    Closes #1429
    Signed-off-by: Marton Szasz <sz...@apache.org>
    Co-authored-by: Martin Zink <ma...@protonmail.com>
---
 .../rocksdb-repos/DatabaseContentRepository.cpp    | 10 +--
 .../rocksdb-repos/DatabaseContentRepository.h      |  4 +-
 .../{ContentSession.h => BufferedContentSession.h} | 45 ++++++--------
 libminifi/include/core/ContentSession.h            | 37 +++--------
 ...ContentSession.h => ForwardingContentSession.h} | 43 ++++++-------
 libminifi/include/core/Relationship.h              |  4 --
 .../include/core/repository/FileSystemRepository.h | 22 +++----
 ...ntentSession.cpp => BufferedContentSession.cpp} | 71 ++++++++++------------
 libminifi/src/core/ContentRepository.cpp           | 19 ++----
 libminifi/src/core/ForwardingContentSession.cpp    | 62 +++++++++++++++++++
 libminifi/src/core/ProcessSession.cpp              |  4 +-
 .../src/core/repository/FileSystemRepository.cpp   |  5 ++
 .../test/rocksdb-tests/ContentSessionTests.cpp     | 40 +++++++-----
 libminifi/test/unit/FileSystemRepositoryTests.cpp  | 53 ++++++++++++++++
 libminifi/test/unit/ProcessSessionTests.cpp        | 22 +++++--
 15 files changed, 263 insertions(+), 178 deletions(-)

diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 03e17a702..008deda72 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -77,7 +77,7 @@ void DatabaseContentRepository::stop() {
   db_.reset();
 }
 
-DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}
 
 std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
   return std::make_shared<Session>(sharedFromThis());
@@ -90,7 +90,7 @@ void DatabaseContentRepository::Session::commit() {
     throw Exception(REPOSITORY_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
   }
   auto batch = opendb->createWriteBatch();
-  for (const auto& resource : managedResources_) {
+  for (const auto& resource : managed_resources_) {
     auto outStream = dbContentRepository->write(*resource.first, false, &batch);
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
@@ -100,7 +100,7 @@ void DatabaseContentRepository::Session::commit() {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
     }
   }
-  for (const auto& resource : extendedResources_) {
+  for (const auto& resource : extended_resources_) {
     auto outStream = dbContentRepository->write(*resource.first, true, &batch);
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
@@ -118,8 +118,8 @@ void DatabaseContentRepository::Session::commit() {
     throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
   }
 
-  managedResources_.clear();
-  extendedResources_.clear();
+  managed_resources_.clear();
+  extended_resources_.clear();
 }
 
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 08e6a4896..d4c797310 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -22,7 +22,7 @@
 #include "core/Core.h"
 #include "core/Connectable.h"
 #include "core/ContentRepository.h"
-#include "core/ContentSession.h"
+#include "core/BufferedContentSession.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Property.h"
 #include "database/RocksDatabase.h"
@@ -34,7 +34,7 @@ namespace org::apache::nifi::minifi::core::repository {
  * DatabaseContentRepository is a content repository that stores data onto the local file system.
  */
 class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
-  class Session : public ContentSession {
+  class Session : public BufferedContentSession {
    public:
     explicit Session(std::shared_ptr<ContentRepository> repository);
 
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/BufferedContentSession.h
similarity index 61%
copy from libminifi/include/core/ContentSession.h
copy to libminifi/include/core/BufferedContentSession.h
index 2106f0cd4..2a0b2a02e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/BufferedContentSession.h
@@ -18,49 +18,42 @@
 
 #pragma once
 
-#include <map>
 #include <memory>
+#include <map>
 #include "ResourceClaim.h"
 #include "io/BaseStream.h"
+#include "ContentSession.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 class ContentRepository;
 
-class ContentSession {
+/**
+ * Buffers the changes in-memory and forwards those to the repository on commit.
+ * Atomicity is NOT guaranteed in this implementation, override commit if needed.
+ * Rollback is possible.
+ */
+class BufferedContentSession : public ContentSession {
  public:
-  enum class WriteMode {
-    OVERWRITE,
-    APPEND
-  };
-
-  explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+  explicit BufferedContentSession(std::shared_ptr<ContentRepository> repository);
 
-  std::shared_ptr<ResourceClaim> create();
+  std::shared_ptr<ResourceClaim> create() override;
 
-  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+  std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  virtual void commit();
+  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  void rollback();
+  void commit() override;
 
-  virtual ~ContentSession() = default;
+  void rollback() override;
 
  protected:
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
+  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managed_resources_;
+  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extended_resources_;
   std::shared_ptr<ContentRepository> repository_;
 };
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
 
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ContentSession.h
index 2106f0cd4..7b5bdcf1e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/ContentSession.h
@@ -18,49 +18,28 @@
 
 #pragma once
 
-#include <map>
 #include <memory>
 #include "ResourceClaim.h"
 #include "io/BaseStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-class ContentRepository;
+namespace org::apache::nifi::minifi::core {
 
 class ContentSession {
  public:
-  enum class WriteMode {
-    OVERWRITE,
-    APPEND
-  };
-
-  explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+  virtual std::shared_ptr<ResourceClaim> create() = 0;
 
-  std::shared_ptr<ResourceClaim> create();
+  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
 
-  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+  virtual std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
 
-  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
 
-  virtual void commit();
+  virtual void commit() = 0;
 
-  void rollback();
+  virtual void rollback() = 0;
 
   virtual ~ContentSession() = default;
-
- protected:
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
-  std::shared_ptr<ContentRepository> repository_;
 };
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
 
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ForwardingContentSession.h
similarity index 58%
copy from libminifi/include/core/ContentSession.h
copy to libminifi/include/core/ForwardingContentSession.h
index 2106f0cd4..b63f3179e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/ForwardingContentSession.h
@@ -18,49 +18,40 @@
 
 #pragma once
 
-#include <map>
+#include <unordered_set>
 #include <memory>
 #include "ResourceClaim.h"
 #include "io/BaseStream.h"
+#include "ContentSession.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 class ContentRepository;
 
-class ContentSession {
+/**
+ * Warning: this implementation simply forwards all calls to the underlying
+ * repository without any atomicity guarantees or possibility of rollback.
+ */
+class ForwardingContentSession : public ContentSession {
  public:
-  enum class WriteMode {
-    OVERWRITE,
-    APPEND
-  };
-
-  explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+  explicit ForwardingContentSession(std::shared_ptr<ContentRepository> repository);
 
-  std::shared_ptr<ResourceClaim> create();
+  std::shared_ptr<ResourceClaim> create() override;
 
-  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+  std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  virtual void commit();
+  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;
 
-  void rollback();
+  void commit() override;
 
-  virtual ~ContentSession() = default;
+  void rollback() override;
 
  protected:
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
-  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
+  std::unordered_set<std::shared_ptr<ResourceClaim>> created_claims_;
   std::shared_ptr<ContentRepository> repository_;
 };
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
 
diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h
index 39aba57a2..492c84d96 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -34,10 +34,6 @@ class Relationship {
         description_(std::move(description)) {
   }
 
-  Relationship(const Relationship& other) = default;
-  Relationship &operator=(const Relationship& other) = default;
-  ~Relationship() = default;
-
   [[nodiscard]] std::string getName() const {
     return name_;
   }
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 54ce36b6c..68566be1f 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
+
+#pragma once
 
 #include <memory>
 #include <string>
@@ -38,26 +38,26 @@ class FileSystemRepository : public core::ContentRepository, public core::CoreCo
         logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
   }
 
-  virtual ~FileSystemRepository() = default;
+  ~FileSystemRepository() override = default;
 
-  virtual bool initialize(const std::shared_ptr<minifi::Configure>& configuration);
+  bool initialize(const std::shared_ptr<minifi::Configure>& configuration) override;
 
-  bool exists(const minifi::ResourceClaim& streamId);
+  bool exists(const minifi::ResourceClaim& streamId) override;
 
-  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false);
+  std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false) override;
 
-  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim);
+  std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) override;
 
-  virtual bool close(const minifi::ResourceClaim& claim) {
+  bool close(const minifi::ResourceClaim& claim) override {
     return remove(claim);
   }
 
-  virtual bool remove(const minifi::ResourceClaim& claim);
+  bool remove(const minifi::ResourceClaim& claim) override;
+
+  std::shared_ptr<ContentSession> createSession() override;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
diff --git a/libminifi/src/core/ContentSession.cpp b/libminifi/src/core/BufferedContentSession.cpp
similarity index 58%
rename from libminifi/src/core/ContentSession.cpp
rename to libminifi/src/core/BufferedContentSession.cpp
index 35a953755..111ac85b4 100644
--- a/libminifi/src/core/ContentSession.cpp
+++ b/libminifi/src/core/BufferedContentSession.cpp
@@ -16,58 +16,53 @@
  * limitations under the License.
  */
 
+#include "core/BufferedContentSession.h"
 #include <memory>
 #include "core/ContentRepository.h"
-#include "core/ContentSession.h"
 #include "ResourceClaim.h"
 #include "io/BaseStream.h"
 #include "Exception.h"
-#include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
-ContentSession::ContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+BufferedContentSession::BufferedContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
 
-std::shared_ptr<ResourceClaim> ContentSession::create() {
+std::shared_ptr<ResourceClaim> BufferedContentSession::create() {
   std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(repository_);
-  managedResources_[claim] = std::make_shared<io::BufferStream>();
+  managed_resources_[claim] = std::make_shared<io::BufferStream>();
   return claim;
 }
 
-std::shared_ptr<io::BaseStream> ContentSession::write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode) {
-  auto it = managedResources_.find(resourceId);
-  if (it == managedResources_.end()) {
-    if (mode == WriteMode::OVERWRITE) {
-      throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
-    }
-    auto& extension = extendedResources_[resourceId];
-    if (!extension) {
-      extension = std::make_shared<io::BufferStream>();
-    }
-    return extension;
+std::shared_ptr<io::BaseStream> BufferedContentSession::write(const std::shared_ptr<ResourceClaim>& resource_id) {
+  if (auto it = managed_resources_.find(resource_id); it != managed_resources_.end()) {
+    return it->second = std::make_shared<io::BufferStream>();
+  }
+  throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+}
+
+std::shared_ptr<io::BaseStream> BufferedContentSession::append(const std::shared_ptr<ResourceClaim>& resource_id) {
+  if (auto it = managed_resources_.find(resource_id); it != managed_resources_.end()) {
+    return it->second;
   }
-  if (mode == WriteMode::OVERWRITE) {
-    it->second = std::make_shared<io::BufferStream>();
+  auto& extension = extended_resources_[resource_id];
+  if (!extension) {
+    extension = std::make_shared<io::BufferStream>();
   }
-  return it->second;
+  return extension;
 }
 
-std::shared_ptr<io::BaseStream> ContentSession::read(const std::shared_ptr<ResourceClaim>& resourceId) {
+std::shared_ptr<io::BaseStream> BufferedContentSession::read(const std::shared_ptr<ResourceClaim>& resource_id) {
   // TODO(adebreceni):
   //  after the stream refactor is merged we should be able to share the underlying buffer
   //  between multiple InputStreams, moreover create a ConcatInputStream
-  if (managedResources_.find(resourceId) != managedResources_.end() || extendedResources_.find(resourceId) != extendedResources_.end()) {
+  if (managed_resources_.contains(resource_id) || extended_resources_.contains(resource_id)) {
     throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified resource");
   }
-  return repository_->read(*resourceId);
+  return repository_->read(*resource_id);
 }
 
-void ContentSession::commit() {
-  for (const auto& resource : managedResources_) {
+void BufferedContentSession::commit() {
+  for (const auto& resource : managed_resources_) {
     auto outStream = repository_->write(*resource.first);
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
@@ -78,7 +73,7 @@ void ContentSession::commit() {
       throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
     }
   }
-  for (const auto& resource : extendedResources_) {
+  for (const auto& resource : extended_resources_) {
     auto outStream = repository_->write(*resource.first, true);
     if (outStream == nullptr) {
       throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
@@ -90,18 +85,14 @@ void ContentSession::commit() {
     }
   }
 
-  managedResources_.clear();
-  extendedResources_.clear();
+  managed_resources_.clear();
+  extended_resources_.clear();
 }
 
-void ContentSession::rollback() {
-  managedResources_.clear();
-  extendedResources_.clear();
+void BufferedContentSession::rollback() {
+  managed_resources_.clear();
+  extended_resources_.clear();
 }
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
 
diff --git a/libminifi/src/core/ContentRepository.cpp b/libminifi/src/core/ContentRepository.cpp
index c4d06edb9..a13032c7b 100644
--- a/libminifi/src/core/ContentRepository.cpp
+++ b/libminifi/src/core/ContentRepository.cpp
@@ -16,18 +16,15 @@
  * limitations under the License.
  */
 
+#include "core/ContentRepository.h"
+
 #include <map>
 #include <memory>
 #include <string>
 
-#include "core/ContentRepository.h"
-#include "core/ContentSession.h"
+#include "core/BufferedContentSession.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 std::string ContentRepository::getStoragePath() const {
   return directory_;
@@ -39,7 +36,7 @@ void ContentRepository::reset() {
 }
 
 std::shared_ptr<ContentSession> ContentRepository::createSession() {
-  return std::make_shared<ContentSession>(sharedFromThis());
+  return std::make_shared<BufferedContentSession>(sharedFromThis());
 }
 
 uint32_t ContentRepository::getStreamCount(const minifi::ResourceClaim &streamId) {
@@ -77,8 +74,4 @@ ContentRepository::StreamState ContentRepository::decrementStreamCount(const min
   }
 }
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/ForwardingContentSession.cpp b/libminifi/src/core/ForwardingContentSession.cpp
new file mode 100644
index 000000000..304612597
--- /dev/null
+++ b/libminifi/src/core/ForwardingContentSession.cpp
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ForwardingContentSession.h"
+
+#include <memory>
+
+#include "core/ContentRepository.h"
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+#include "Exception.h"
+
+namespace org::apache::nifi::minifi::core {
+
+ForwardingContentSession::ForwardingContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+
+std::shared_ptr<ResourceClaim> ForwardingContentSession::create() {
+  auto claim = std::make_shared<ResourceClaim>(repository_);
+  created_claims_.insert(claim);
+  return claim;
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::write(const std::shared_ptr<ResourceClaim>& resource_id) {
+  if (!created_claims_.contains(resource_id)) {
+    throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+  }
+  return repository_->write(*resource_id, false);
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::append(const std::shared_ptr<ResourceClaim>& resource_id) {
+  return repository_->write(*resource_id, true);
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::read(const std::shared_ptr<ResourceClaim>& resource_id) {
+  return repository_->read(*resource_id);
+}
+
+void ForwardingContentSession::commit() {
+  created_claims_.clear();
+}
+
+void ForwardingContentSession::rollback() {
+  created_claims_.clear();
+}
+
+}  // namespace org::apache::nifi::minifi::core
+
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 7eca66138..d2b8e7580 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -286,7 +286,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const i
 
   try {
     auto start_time = std::chrono::steady_clock::now();
-    std::shared_ptr<io::BaseStream> stream = content_session_->write(claim, ContentSession::WriteMode::APPEND);
+    std::shared_ptr<io::BaseStream> stream = content_session_->append(claim);
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
     }
@@ -296,7 +296,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const i
     size_t stream_size_before_callback = stream->size();
     // this prevents an issue if we write, above, with zero length.
     if (stream_size_before_callback > 0)
-      stream->seek(stream_size_before_callback + 1);
+      stream->seek(stream_size_before_callback);
     if (callback(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 85f42bfb4..92df30d9f 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -21,6 +21,7 @@
 #include <string>
 #include "io/FileStream.h"
 #include "utils/file/FileUtils.h"
+#include "core/ForwardingContentSession.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -54,4 +55,8 @@ bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) {
   return true;
 }
 
+std::shared_ptr<ContentSession> FileSystemRepository::createSession() {
+  return std::make_shared<ForwardingContentSession>(sharedFromThis());
+}
+
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index a73e9c19e..638850c4e 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -24,6 +24,7 @@
 #include "FileSystemRepository.h"
 #include "VolatileContentRepository.h"
 #include "DatabaseContentRepository.h"
+#include "BufferedContentSession.h"
 #include "FlowFileRecord.h"
 #include "../TestBase.h"
 #include "../Catch.h"
@@ -72,7 +73,6 @@ void test_template() {
   ContentSessionController<ContentRepositoryClass> controller;
   std::shared_ptr<core::ContentRepository> contentRepository = controller.contentRepository;
 
-
   std::shared_ptr<minifi::ResourceClaim> oldClaim;
   {
     auto session = contentRepository->createSession();
@@ -82,23 +82,32 @@ void test_template() {
   }
 
   auto session = contentRepository->createSession();
+  const bool is_buffered_session = std::dynamic_pointer_cast<core::BufferedContentSession>(session) != nullptr;
+
   REQUIRE_THROWS(session->write(oldClaim));
 
   REQUIRE_NOTHROW(session->read(oldClaim));
-  session->write(oldClaim, core::ContentSession::WriteMode::APPEND) << "-addendum";
-  REQUIRE_THROWS(session->read(oldClaim));  // now throws because we appended to the content
+
+  session->append(oldClaim) << "-addendum";
+  // TODO(adebreceni): MINIFICPP-1954
+  if (is_buffered_session) {
+    REQUIRE_THROWS(session->read(oldClaim));
+  }
 
   auto claim1 = session->create();
   session->write(claim1) << "hello content!";
-  REQUIRE_THROWS(session->read(claim1));  // TODO(adebreceni): we currently have no means to create joined streams
+  // TODO(adebreceni): MINIFICPP-1954
+  if (is_buffered_session) {
+    REQUIRE_THROWS(session->read(claim1));
+  }
 
   auto claim2 = session->create();
-  session->write(claim2, core::ContentSession::WriteMode::APPEND) << "beginning";
-  session->write(claim2, core::ContentSession::WriteMode::APPEND) << "-end";
+  session->append(claim2) << "beginning";
+  session->append(claim2) << "-end";
 
   auto claim3 = session->create();
   session->write(claim3) << "first";
-  session->write(claim3, core::ContentSession::WriteMode::APPEND) << "-last";
+  session->append(claim3) << "-last";
 
   auto claim4 = session->create();
   session->write(claim4) << "beginning";
@@ -127,14 +136,15 @@ void test_template() {
   SECTION("Rollback") {
     session->rollback();
 
-    std::string content;
-    contentRepository->read(*oldClaim) >> content;
-    REQUIRE(content == "data");
-
-    REQUIRE(!contentRepository->exists(*claim1));
-    REQUIRE(!contentRepository->exists(*claim2));
-    REQUIRE(!contentRepository->exists(*claim3));
-    REQUIRE(!contentRepository->exists(*claim4));
+    if (is_buffered_session) {
+      std::string content;
+      contentRepository->read(*oldClaim) >> content;
+      REQUIRE(content == "data");
+      REQUIRE(!contentRepository->exists(*claim1));
+      REQUIRE(!contentRepository->exists(*claim2));
+      REQUIRE(!contentRepository->exists(*claim3));
+      REQUIRE(!contentRepository->exists(*claim4));
+    }
   }
 }
 
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp
new file mode 100644
index 000000000..d98d4a68e
--- /dev/null
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// loading extensions increases the baseline memory usage
+// as we measure the absolute memory usage that would fail this test
+#define EXTENSION_LIST ""
+
+#include <cstring>
+
+#include "utils/gsl.h"
+#include "utils/OsUtils.h"
+#include "../TestBase.h"
+#include "../Catch.h"
+#include "utils/Literals.h"
+#include "core/repository/FileSystemRepository.h"
+
+TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
+  TestController controller;
+  auto dir = controller.createTempDirectory();
+  auto fs_repo = std::make_shared<minifi::core::repository::FileSystemRepository>();
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+  REQUIRE(fs_repo->initialize(config));
+  const auto start_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+
+  auto content_session = fs_repo->createSession();
+  auto resource_id = content_session->create();
+  auto stream = content_session->write(resource_id);
+  size_t file_size = 20_MB;
+  gsl::span<const char> fragment = "well, hello there";
+  for (size_t i = 0; i < file_size / fragment.size() + 1; ++i) {
+    stream->write(fragment.as_span<const std::byte>());
+  }
+
+  const auto end_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+
+  REQUIRE(gsl::narrow<size_t>(end_memory - start_memory) < 1_MB);
+}
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 26a9d1d68..f6df20829 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -115,11 +115,23 @@ TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[reado
 }
 
 TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
-  ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
-  ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
-
-  ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
-  ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+  SECTION("Unmanaged") {
+    SECTION("VolatileContentRepository") {
+      ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+    }
+    SECTION("FileSystemRepository") {
+      ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+    }
+  }
+
+  SECTION("Managed") {
+    SECTION("VolatileContentRepository") {
+      ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+    }
+    SECTION("FileSystemRepository") {
+      ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+    }
+  }
 }
 
 TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "[zerolengthread]") {