You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/20 13:48:34 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 56302705f MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository
56302705f is described below
commit 56302705fdcf91d5e1f5249cddb55577baa53b80
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Thu Oct 20 15:32:15 2022 +0200
MINIFICPP-1950 Disable in-memory buffering for FileSystemRepository
Closes #1429
Signed-off-by: Marton Szasz <sz...@apache.org>
Co-authored-by: Martin Zink <ma...@protonmail.com>
---
.../rocksdb-repos/DatabaseContentRepository.cpp | 10 +--
.../rocksdb-repos/DatabaseContentRepository.h | 4 +-
.../{ContentSession.h => BufferedContentSession.h} | 45 ++++++--------
libminifi/include/core/ContentSession.h | 37 +++--------
...ContentSession.h => ForwardingContentSession.h} | 43 ++++++-------
libminifi/include/core/Relationship.h | 4 --
.../include/core/repository/FileSystemRepository.h | 22 +++----
...ntentSession.cpp => BufferedContentSession.cpp} | 71 ++++++++++------------
libminifi/src/core/ContentRepository.cpp | 19 ++----
libminifi/src/core/ForwardingContentSession.cpp | 62 +++++++++++++++++++
libminifi/src/core/ProcessSession.cpp | 4 +-
.../src/core/repository/FileSystemRepository.cpp | 5 ++
.../test/rocksdb-tests/ContentSessionTests.cpp | 40 +++++++-----
libminifi/test/unit/FileSystemRepositoryTests.cpp | 53 ++++++++++++++++
libminifi/test/unit/ProcessSessionTests.cpp | 22 +++++--
15 files changed, 263 insertions(+), 178 deletions(-)
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 03e17a702..008deda72 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -77,7 +77,7 @@ void DatabaseContentRepository::stop() {
db_.reset();
}
-DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}
std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
return std::make_shared<Session>(sharedFromThis());
@@ -90,7 +90,7 @@ void DatabaseContentRepository::Session::commit() {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
}
auto batch = opendb->createWriteBatch();
- for (const auto& resource : managedResources_) {
+ for (const auto& resource : managed_resources_) {
auto outStream = dbContentRepository->write(*resource.first, false, &batch);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
@@ -100,7 +100,7 @@ void DatabaseContentRepository::Session::commit() {
throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
}
}
- for (const auto& resource : extendedResources_) {
+ for (const auto& resource : extended_resources_) {
auto outStream = dbContentRepository->write(*resource.first, true, &batch);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
@@ -118,8 +118,8 @@ void DatabaseContentRepository::Session::commit() {
throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
}
- managedResources_.clear();
- extendedResources_.clear();
+ managed_resources_.clear();
+ extended_resources_.clear();
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 08e6a4896..d4c797310 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -22,7 +22,7 @@
#include "core/Core.h"
#include "core/Connectable.h"
#include "core/ContentRepository.h"
-#include "core/ContentSession.h"
+#include "core/BufferedContentSession.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Property.h"
#include "database/RocksDatabase.h"
@@ -34,7 +34,7 @@ namespace org::apache::nifi::minifi::core::repository {
* DatabaseContentRepository is a content repository that stores data onto the local file system.
*/
class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
- class Session : public ContentSession {
+ class Session : public BufferedContentSession {
public:
explicit Session(std::shared_ptr<ContentRepository> repository);
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/BufferedContentSession.h
similarity index 61%
copy from libminifi/include/core/ContentSession.h
copy to libminifi/include/core/BufferedContentSession.h
index 2106f0cd4..2a0b2a02e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/BufferedContentSession.h
@@ -18,49 +18,42 @@
#pragma once
-#include <map>
#include <memory>
+#include <map>
#include "ResourceClaim.h"
#include "io/BaseStream.h"
+#include "ContentSession.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
class ContentRepository;
-class ContentSession {
+/**
+ * Buffers the changes in-memory and forwards those to the repository on commit.
+ * Atomicity is NOT guaranteed in this implementation, override commit if needed.
+ * Rollback is possible.
+ */
+class BufferedContentSession : public ContentSession {
public:
- enum class WriteMode {
- OVERWRITE,
- APPEND
- };
-
- explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+ explicit BufferedContentSession(std::shared_ptr<ContentRepository> repository);
- std::shared_ptr<ResourceClaim> create();
+ std::shared_ptr<ResourceClaim> create() override;
- std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+ std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) override;
- std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+ std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) override;
- virtual void commit();
+ std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;
- void rollback();
+ void commit() override;
- virtual ~ContentSession() = default;
+ void rollback() override;
protected:
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
+ std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managed_resources_;
+ std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extended_resources_;
std::shared_ptr<ContentRepository> repository_;
};
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ContentSession.h
index 2106f0cd4..7b5bdcf1e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/ContentSession.h
@@ -18,49 +18,28 @@
#pragma once
-#include <map>
#include <memory>
#include "ResourceClaim.h"
#include "io/BaseStream.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-class ContentRepository;
+namespace org::apache::nifi::minifi::core {
class ContentSession {
public:
- enum class WriteMode {
- OVERWRITE,
- APPEND
- };
-
- explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+ virtual std::shared_ptr<ResourceClaim> create() = 0;
- std::shared_ptr<ResourceClaim> create();
+ virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
- std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+ virtual std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
- std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+ virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) = 0;
- virtual void commit();
+ virtual void commit() = 0;
- void rollback();
+ virtual void rollback() = 0;
virtual ~ContentSession() = default;
-
- protected:
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
- std::shared_ptr<ContentRepository> repository_;
};
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ForwardingContentSession.h
similarity index 58%
copy from libminifi/include/core/ContentSession.h
copy to libminifi/include/core/ForwardingContentSession.h
index 2106f0cd4..b63f3179e 100644
--- a/libminifi/include/core/ContentSession.h
+++ b/libminifi/include/core/ForwardingContentSession.h
@@ -18,49 +18,40 @@
#pragma once
-#include <map>
+#include <unordered_set>
#include <memory>
#include "ResourceClaim.h"
#include "io/BaseStream.h"
+#include "ContentSession.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
class ContentRepository;
-class ContentSession {
+/**
+ * Warning: this implementation simply forwards all calls to the underlying
+ * repository without any atomicity guarantees or possibility of rollback.
+ */
+class ForwardingContentSession : public ContentSession {
public:
- enum class WriteMode {
- OVERWRITE,
- APPEND
- };
-
- explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+ explicit ForwardingContentSession(std::shared_ptr<ContentRepository> repository);
- std::shared_ptr<ResourceClaim> create();
+ std::shared_ptr<ResourceClaim> create() override;
- std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+ std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resource_id) override;
- std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+ std::shared_ptr<io::BaseStream> append(const std::shared_ptr<ResourceClaim>& resource_id) override;
- virtual void commit();
+ std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resource_id) override;
- void rollback();
+ void commit() override;
- virtual ~ContentSession() = default;
+ void rollback() override;
protected:
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> managedResources_;
- std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BufferStream>> extendedResources_;
+ std::unordered_set<std::shared_ptr<ResourceClaim>> created_claims_;
std::shared_ptr<ContentRepository> repository_;
};
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/Relationship.h b/libminifi/include/core/Relationship.h
index 39aba57a2..492c84d96 100644
--- a/libminifi/include/core/Relationship.h
+++ b/libminifi/include/core/Relationship.h
@@ -34,10 +34,6 @@ class Relationship {
description_(std::move(description)) {
}
- Relationship(const Relationship& other) = default;
- Relationship &operator=(const Relationship& other) = default;
- ~Relationship() = default;
-
[[nodiscard]] std::string getName() const {
return name_;
}
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 54ce36b6c..68566be1f 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
+
+#pragma once
#include <memory>
#include <string>
@@ -38,26 +38,26 @@ class FileSystemRepository : public core::ContentRepository, public core::CoreCo
logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
}
- virtual ~FileSystemRepository() = default;
+ ~FileSystemRepository() override = default;
- virtual bool initialize(const std::shared_ptr<minifi::Configure>& configuration);
+ bool initialize(const std::shared_ptr<minifi::Configure>& configuration) override;
- bool exists(const minifi::ResourceClaim& streamId);
+ bool exists(const minifi::ResourceClaim& streamId) override;
- virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false);
+ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false) override;
- virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim);
+ std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) override;
- virtual bool close(const minifi::ResourceClaim& claim) {
+ bool close(const minifi::ResourceClaim& claim) override {
return remove(claim);
}
- virtual bool remove(const minifi::ResourceClaim& claim);
+ bool remove(const minifi::ResourceClaim& claim) override;
+
+ std::shared_ptr<ContentSession> createSession() override;
private:
std::shared_ptr<logging::Logger> logger_;
};
} // namespace org::apache::nifi::minifi::core::repository
-
-#endif // LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
diff --git a/libminifi/src/core/ContentSession.cpp b/libminifi/src/core/BufferedContentSession.cpp
similarity index 58%
rename from libminifi/src/core/ContentSession.cpp
rename to libminifi/src/core/BufferedContentSession.cpp
index 35a953755..111ac85b4 100644
--- a/libminifi/src/core/ContentSession.cpp
+++ b/libminifi/src/core/BufferedContentSession.cpp
@@ -16,58 +16,53 @@
* limitations under the License.
*/
+#include "core/BufferedContentSession.h"
#include <memory>
#include "core/ContentRepository.h"
-#include "core/ContentSession.h"
#include "ResourceClaim.h"
#include "io/BaseStream.h"
#include "Exception.h"
-#include "utils/gsl.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
-ContentSession::ContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+BufferedContentSession::BufferedContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
-std::shared_ptr<ResourceClaim> ContentSession::create() {
+std::shared_ptr<ResourceClaim> BufferedContentSession::create() {
std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(repository_);
- managedResources_[claim] = std::make_shared<io::BufferStream>();
+ managed_resources_[claim] = std::make_shared<io::BufferStream>();
return claim;
}
-std::shared_ptr<io::BaseStream> ContentSession::write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode) {
- auto it = managedResources_.find(resourceId);
- if (it == managedResources_.end()) {
- if (mode == WriteMode::OVERWRITE) {
- throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
- }
- auto& extension = extendedResources_[resourceId];
- if (!extension) {
- extension = std::make_shared<io::BufferStream>();
- }
- return extension;
+std::shared_ptr<io::BaseStream> BufferedContentSession::write(const std::shared_ptr<ResourceClaim>& resource_id) {
+ if (auto it = managed_resources_.find(resource_id); it != managed_resources_.end()) {
+ return it->second = std::make_shared<io::BufferStream>();
+ }
+ throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+}
+
+std::shared_ptr<io::BaseStream> BufferedContentSession::append(const std::shared_ptr<ResourceClaim>& resource_id) {
+ if (auto it = managed_resources_.find(resource_id); it != managed_resources_.end()) {
+ return it->second;
}
- if (mode == WriteMode::OVERWRITE) {
- it->second = std::make_shared<io::BufferStream>();
+ auto& extension = extended_resources_[resource_id];
+ if (!extension) {
+ extension = std::make_shared<io::BufferStream>();
}
- return it->second;
+ return extension;
}
-std::shared_ptr<io::BaseStream> ContentSession::read(const std::shared_ptr<ResourceClaim>& resourceId) {
+std::shared_ptr<io::BaseStream> BufferedContentSession::read(const std::shared_ptr<ResourceClaim>& resource_id) {
// TODO(adebreceni):
// after the stream refactor is merged we should be able to share the underlying buffer
// between multiple InputStreams, moreover create a ConcatInputStream
- if (managedResources_.find(resourceId) != managedResources_.end() || extendedResources_.find(resourceId) != extendedResources_.end()) {
+ if (managed_resources_.contains(resource_id) || extended_resources_.contains(resource_id)) {
throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified resource");
}
- return repository_->read(*resourceId);
+ return repository_->read(*resource_id);
}
-void ContentSession::commit() {
- for (const auto& resource : managedResources_) {
+void BufferedContentSession::commit() {
+ for (const auto& resource : managed_resources_) {
auto outStream = repository_->write(*resource.first);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
@@ -78,7 +73,7 @@ void ContentSession::commit() {
throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
}
}
- for (const auto& resource : extendedResources_) {
+ for (const auto& resource : extended_resources_) {
auto outStream = repository_->write(*resource.first, true);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
@@ -90,18 +85,14 @@ void ContentSession::commit() {
}
}
- managedResources_.clear();
- extendedResources_.clear();
+ managed_resources_.clear();
+ extended_resources_.clear();
}
-void ContentSession::rollback() {
- managedResources_.clear();
- extendedResources_.clear();
+void BufferedContentSession::rollback() {
+ managed_resources_.clear();
+ extended_resources_.clear();
}
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/ContentRepository.cpp b/libminifi/src/core/ContentRepository.cpp
index c4d06edb9..a13032c7b 100644
--- a/libminifi/src/core/ContentRepository.cpp
+++ b/libminifi/src/core/ContentRepository.cpp
@@ -16,18 +16,15 @@
* limitations under the License.
*/
+#include "core/ContentRepository.h"
+
#include <map>
#include <memory>
#include <string>
-#include "core/ContentRepository.h"
-#include "core/ContentSession.h"
+#include "core/BufferedContentSession.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
std::string ContentRepository::getStoragePath() const {
return directory_;
@@ -39,7 +36,7 @@ void ContentRepository::reset() {
}
std::shared_ptr<ContentSession> ContentRepository::createSession() {
- return std::make_shared<ContentSession>(sharedFromThis());
+ return std::make_shared<BufferedContentSession>(sharedFromThis());
}
uint32_t ContentRepository::getStreamCount(const minifi::ResourceClaim &streamId) {
@@ -77,8 +74,4 @@ ContentRepository::StreamState ContentRepository::decrementStreamCount(const min
}
}
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/ForwardingContentSession.cpp b/libminifi/src/core/ForwardingContentSession.cpp
new file mode 100644
index 000000000..304612597
--- /dev/null
+++ b/libminifi/src/core/ForwardingContentSession.cpp
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/ForwardingContentSession.h"
+
+#include <memory>
+
+#include "core/ContentRepository.h"
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+#include "Exception.h"
+
+namespace org::apache::nifi::minifi::core {
+
+ForwardingContentSession::ForwardingContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+
+std::shared_ptr<ResourceClaim> ForwardingContentSession::create() {
+ auto claim = std::make_shared<ResourceClaim>(repository_);
+ created_claims_.insert(claim);
+ return claim;
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::write(const std::shared_ptr<ResourceClaim>& resource_id) {
+ if (!created_claims_.contains(resource_id)) {
+ throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+ }
+ return repository_->write(*resource_id, false);
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::append(const std::shared_ptr<ResourceClaim>& resource_id) {
+ return repository_->write(*resource_id, true);
+}
+
+std::shared_ptr<io::BaseStream> ForwardingContentSession::read(const std::shared_ptr<ResourceClaim>& resource_id) {
+ return repository_->read(*resource_id);
+}
+
+void ForwardingContentSession::commit() {
+ created_claims_.clear();
+}
+
+void ForwardingContentSession::rollback() {
+ created_claims_.clear();
+}
+
+} // namespace org::apache::nifi::minifi::core
+
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 7eca66138..d2b8e7580 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -286,7 +286,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const i
try {
auto start_time = std::chrono::steady_clock::now();
- std::shared_ptr<io::BaseStream> stream = content_session_->write(claim, ContentSession::WriteMode::APPEND);
+ std::shared_ptr<io::BaseStream> stream = content_session_->append(claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
}
@@ -296,7 +296,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const i
size_t stream_size_before_callback = stream->size();
// this prevents an issue if we write, above, with zero length.
if (stream_size_before_callback > 0)
- stream->seek(stream_size_before_callback + 1);
+ stream->seek(stream_size_before_callback);
if (callback(stream) < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
}
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 85f42bfb4..92df30d9f 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -21,6 +21,7 @@
#include <string>
#include "io/FileStream.h"
#include "utils/file/FileUtils.h"
+#include "core/ForwardingContentSession.h"
namespace org::apache::nifi::minifi::core::repository {
@@ -54,4 +55,8 @@ bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) {
return true;
}
+std::shared_ptr<ContentSession> FileSystemRepository::createSession() {
+ return std::make_shared<ForwardingContentSession>(sharedFromThis());
+}
+
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index a73e9c19e..638850c4e 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -24,6 +24,7 @@
#include "FileSystemRepository.h"
#include "VolatileContentRepository.h"
#include "DatabaseContentRepository.h"
+#include "BufferedContentSession.h"
#include "FlowFileRecord.h"
#include "../TestBase.h"
#include "../Catch.h"
@@ -72,7 +73,6 @@ void test_template() {
ContentSessionController<ContentRepositoryClass> controller;
std::shared_ptr<core::ContentRepository> contentRepository = controller.contentRepository;
-
std::shared_ptr<minifi::ResourceClaim> oldClaim;
{
auto session = contentRepository->createSession();
@@ -82,23 +82,32 @@ void test_template() {
}
auto session = contentRepository->createSession();
+ const bool is_buffered_session = std::dynamic_pointer_cast<core::BufferedContentSession>(session) != nullptr;
+
REQUIRE_THROWS(session->write(oldClaim));
REQUIRE_NOTHROW(session->read(oldClaim));
- session->write(oldClaim, core::ContentSession::WriteMode::APPEND) << "-addendum";
- REQUIRE_THROWS(session->read(oldClaim)); // now throws because we appended to the content
+
+ session->append(oldClaim) << "-addendum";
+ // TODO(adebreceni): MINIFICPP-1954
+ if (is_buffered_session) {
+ REQUIRE_THROWS(session->read(oldClaim));
+ }
auto claim1 = session->create();
session->write(claim1) << "hello content!";
- REQUIRE_THROWS(session->read(claim1)); // TODO(adebreceni): we currently have no means to create joined streams
+ // TODO(adebreceni): MINIFICPP-1954
+ if (is_buffered_session) {
+ REQUIRE_THROWS(session->read(claim1));
+ }
auto claim2 = session->create();
- session->write(claim2, core::ContentSession::WriteMode::APPEND) << "beginning";
- session->write(claim2, core::ContentSession::WriteMode::APPEND) << "-end";
+ session->append(claim2) << "beginning";
+ session->append(claim2) << "-end";
auto claim3 = session->create();
session->write(claim3) << "first";
- session->write(claim3, core::ContentSession::WriteMode::APPEND) << "-last";
+ session->append(claim3) << "-last";
auto claim4 = session->create();
session->write(claim4) << "beginning";
@@ -127,14 +136,15 @@ void test_template() {
SECTION("Rollback") {
session->rollback();
- std::string content;
- contentRepository->read(*oldClaim) >> content;
- REQUIRE(content == "data");
-
- REQUIRE(!contentRepository->exists(*claim1));
- REQUIRE(!contentRepository->exists(*claim2));
- REQUIRE(!contentRepository->exists(*claim3));
- REQUIRE(!contentRepository->exists(*claim4));
+ if (is_buffered_session) {
+ std::string content;
+ contentRepository->read(*oldClaim) >> content;
+ REQUIRE(content == "data");
+ REQUIRE(!contentRepository->exists(*claim1));
+ REQUIRE(!contentRepository->exists(*claim2));
+ REQUIRE(!contentRepository->exists(*claim3));
+ REQUIRE(!contentRepository->exists(*claim4));
+ }
}
}
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp
new file mode 100644
index 000000000..d98d4a68e
--- /dev/null
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// loading extensions increases the baseline memory usage
+// as we measure the absolute memory usage that would fail this test
+#define EXTENSION_LIST ""
+
+#include <cstring>
+
+#include "utils/gsl.h"
+#include "utils/OsUtils.h"
+#include "../TestBase.h"
+#include "../Catch.h"
+#include "utils/Literals.h"
+#include "core/repository/FileSystemRepository.h"
+
+TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
+ TestController controller;
+ auto dir = controller.createTempDirectory();
+ auto fs_repo = std::make_shared<minifi::core::repository::FileSystemRepository>();
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
+ REQUIRE(fs_repo->initialize(config));
+ const auto start_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+
+ auto content_session = fs_repo->createSession();
+ auto resource_id = content_session->create();
+ auto stream = content_session->write(resource_id);
+ size_t file_size = 20_MB;
+ gsl::span<const char> fragment = "well, hello there";
+ for (size_t i = 0; i < file_size / fragment.size() + 1; ++i) {
+ stream->write(fragment.as_span<const std::byte>());
+ }
+
+ const auto end_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+
+ REQUIRE(gsl::narrow<size_t>(end_memory - start_memory) < 1_MB);
+}
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 26a9d1d68..f6df20829 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -115,11 +115,23 @@ TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[reado
}
TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly" "[appendsetsize]") {
- ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
- ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
-
- ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
- ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+ SECTION("Unmanaged") {
+ SECTION("VolatileContentRepository") {
+ ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+ }
+ SECTION("FileSystemRepository") {
+ ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+ }
+ }
+
+ SECTION("Managed") {
+ SECTION("VolatileContentRepository") {
+ ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::VolatileContentRepository>());
+ }
+ SECTION("FileSystemRepository") {
+ ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared<minifi::core::repository::FileSystemRepository>());
+ }
+ }
}
TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "[zerolengthread]") {