You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/11/10 13:25:36 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1380 - Batch
behavior for MergeContent and CompressContent
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ed523d4 MINIFICPP-1380 - Batch behavior for MergeContent and CompressContent
ed523d4 is described below
commit ed523d47b7f5db83bd5124466e122459dd340fb4
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Sep 29 14:39:01 2020 +0200
MINIFICPP-1380 - Batch behavior for MergeContent and CompressContent
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #917
---
extensions/civetweb/processors/ListenHTTP.cpp | 2 +-
extensions/libarchive/BinFiles.cpp | 94 +++--
extensions/libarchive/BinFiles.h | 65 ++-
extensions/libarchive/CompressContent.cpp | 126 +++---
extensions/libarchive/CompressContent.h | 68 ++--
extensions/libarchive/MergeContent.cpp | 1 +
.../processors/ListenSyslog.cpp | 2 +-
.../processors/RetryFlowFile.cpp | 2 +-
libminifi/include/core/CachedValueValidator.h | 2 +-
libminifi/include/core/ProcessSession.h | 2 +
libminifi/include/core/Property.h | 10 +-
libminifi/include/core/PropertyValidation.h | 37 +-
libminifi/include/io/BufferStream.h | 5 +
libminifi/include/utils/Enum.h | 200 ++++++++++
libminifi/include/utils/StringUtils.h | 17 +
libminifi/src/core/ProcessSession.cpp | 3 +
libminifi/src/core/PropertyValidation.cpp | 6 +-
libminifi/test/Utils.h | 7 +-
.../test/archive-tests/CompressContentTests.cpp | 188 ++++++---
libminifi/test/archive-tests/MergeFileTests.cpp | 434 ++++++++-------------
libminifi/test/unit/EnumTests.cpp | 105 +++++
21 files changed, 842 insertions(+), 534 deletions(-)
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 39b7e8e..fa78527 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -38,7 +38,7 @@ core::Property ListenHTTP::Port(
core::PropertyBuilder::createProperty("Listening Port")
->withDescription("The Port to listen on for incoming connections. 0 means port is going to be selected randomly.")
->isRequired(true)
- ->withDefaultValue<int>(80, core::StandardValidators::LISTEN_PORT_VALIDATOR())->build());
+ ->withDefaultValue<int>(80, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
" connections. If the Pattern does not match the DN, the connection will be refused.",
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index a6b349e..39aef68 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -38,12 +38,34 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0");
-core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", "");
-core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1");
-core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", "");
-core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>", "");
-core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
+core::Property BinFiles::MinSize(
+ core::PropertyBuilder::createProperty("Minimum Group Size")
+ ->withDescription("The minimum size of for the bundle")
+ ->withDefaultValue<uint64_t>(0)->build());
+core::Property BinFiles::MaxSize(
+ core::PropertyBuilder::createProperty("Maximum Group Size")
+ ->withDescription("The maximum size for the bundle. If not specified, there is no maximum.")
+ ->withType(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR)->build());
+core::Property BinFiles::MinEntries(
+ core::PropertyBuilder::createProperty("Minimum Number of Entries")
+ ->withDescription("The minimum number of files to include in a bundle")
+ ->withDefaultValue<uint32_t>(1)->build());
+core::Property BinFiles::MaxEntries(
+ core::PropertyBuilder::createProperty("Maximum Number of Entries")
+ ->withDescription("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
+ ->withType(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR)->build());
+core::Property BinFiles::MaxBinAge(
+ core::PropertyBuilder::createProperty("Max Bin Age")
+ ->withDescription("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>")
+ ->withType(core::StandardValidators::get().TIME_PERIOD_VALIDATOR)->build());
+core::Property BinFiles::MaxBinCount(
+ core::PropertyBuilder::createProperty("Maximum number of Bins")
+ ->withDescription("Specifies the maximum number of bins that can be held in memory at any one time")
+ ->withDefaultValue<uint32_t>(100)->build());
+core::Property BinFiles::BatchSize(
+ core::PropertyBuilder::createProperty("Batch Size")
+ ->withDescription("Maximum number of FlowFiles processed in a single session")
+ ->withDefaultValue<uint32_t>(1)->build());
core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to create the bundle will be transferred to failure");
core::Relationship BinFiles::Self("__self__", "Marks the FlowFile to be owned by this processor");
@@ -65,6 +87,7 @@ void BinFiles::initialize() {
properties.insert(MaxEntries);
properties.insert(MaxBinAge);
properties.insert(MaxBinCount);
+ properties.insert(BatchSize);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -74,41 +97,38 @@ void BinFiles::initialize() {
}
void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- std::string value;
- int64_t valInt64;
- int valInt;
- if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt64)) {
- this->binManager_.setMinSize(valInt64);
- logger_->log_debug("BinFiles: MinSize [%" PRId64 "]", valInt64);
+ uint32_t val32;
+ uint64_t val64;
+ if (context->getProperty(MinSize.getName(), val64)) {
+ this->binManager_.setMinSize({val64});
+ logger_->log_debug("BinFiles: MinSize [%" PRId64 "]", val64);
}
- value = "";
- if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt64)) {
- this->binManager_.setMaxSize(valInt64);
- logger_->log_debug("BinFiles: MaxSize [%" PRId64 "]", valInt64);
+ if (context->getProperty(MaxSize.getName(), val64)) {
+ this->binManager_.setMaxSize({val64});
+ logger_->log_debug("BinFiles: MaxSize [%" PRId64 "]", val64);
}
- value = "";
- if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
- this->binManager_.setMinEntries(valInt);
- logger_->log_debug("BinFiles: MinEntries [%d]", valInt);
+ if (context->getProperty(MinEntries.getName(), val32)) {
+ this->binManager_.setMinEntries({val32});
+ logger_->log_debug("BinFiles: MinEntries [%" PRIu32 "]", val32);
}
- value = "";
- if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
- this->binManager_.setMaxEntries(valInt);
- logger_->log_debug("BinFiles: MaxEntries [%d]", valInt);
+ if (context->getProperty(MaxEntries.getName(), val32)) {
+ this->binManager_.setMaxEntries({val32});
+ logger_->log_debug("BinFiles: MaxEntries [%" PRIu32 "]", val32);
}
- value = "";
- if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
- maxBinCount_ = valInt;
- logger_->log_debug("BinFiles: MaxBinCount [%d]", valInt);
+ if (context->getProperty(MaxBinCount.getName(), maxBinCount_)) {
+ logger_->log_debug("BinFiles: MaxBinCount [%" PRIu32 "]", maxBinCount_);
}
- value = "";
- if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
+ std::string maxBinAgeStr;
+ if (context->getProperty(MaxBinAge.getName(), maxBinAgeStr)) {
core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt64, unit) && core::Property::ConvertTimeUnitToMS(valInt64, unit, valInt64)) {
- this->binManager_.setBinAge(valInt64);
- logger_->log_debug("BinFiles: MaxBinAge [%" PRId64 "]", valInt64);
+ if (core::Property::StringToTime(maxBinAgeStr, val64, unit) && core::Property::ConvertTimeUnitToMS(val64, unit, val64)) {
+ this->binManager_.setBinAge({val64});
+ logger_->log_debug("BinFiles: MaxBinAge [%" PRIu64 "]", val64);
}
}
+ if (context->getProperty(BatchSize.getName(), batchSize_)) {
+ logger_->log_debug("BinFiles: BatchSize [%" PRIu32 "]", batchSize_);
+ }
}
void BinFiles::preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow) {
@@ -259,9 +279,13 @@ void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
}
}
- auto flow = session->get();
+ for (size_t i = 0; i < batchSize_; ++i) {
+ auto flow = session->get();
+
+ if (flow == nullptr) {
+ break;
+ }
- if (flow != nullptr) {
preprocessFlowFile(context.get(), session.get(), flow);
std::string groupId = getGroupId(context.get(), flow);
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index b7f7e6f..a3e594d 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -21,7 +21,7 @@
#define __BIN_FILES_H__
#include <cinttypes>
-#include <climits>
+#include <limits>
#include <deque>
#include <map>
#include "FlowFileRecord.h"
@@ -146,36 +146,23 @@ class Bin {
// BinManager Class
class BinManager {
public:
- // Constructor
- /*!
- * Create a new BinManager
- */
- BinManager()
- : minSize_(0),
- maxSize_(ULLONG_MAX),
- maxEntries_(INT_MAX),
- minEntries_(1),
- binAge_(ULLONG_MAX),
- binCount_(0),
- logger_(logging::LoggerFactory<BinManager>::getLogger()) {
- }
virtual ~BinManager() {
purge();
}
- void setMinSize(const uint64_t &size) {
- minSize_ = size;
+ void setMinSize(uint64_t size) {
+ minSize_ = {size};
}
- void setMaxSize(const uint64_t &size) {
- maxSize_ = size;
+ void setMaxSize(uint64_t size) {
+ maxSize_ = {size};
}
- void setMaxEntries(const int &entries) {
- maxEntries_ = entries;
+ void setMaxEntries(uint32_t entries) {
+ maxEntries_ = {entries};
}
- void setMinEntries(const int &entries) {
- minEntries_ = entries;
+ void setMinEntries(uint32_t entries) {
+ minEntries_ = {entries};
}
- void setBinAge(const uint64_t &age) {
- binAge_ = age;
+ void setBinAge(uint64_t age) {
+ binAge_ = {age};
}
int getBinCount() {
return binCount_;
@@ -201,17 +188,17 @@ class BinManager {
private:
std::mutex mutex_;
- uint64_t minSize_;
- uint64_t maxSize_;
- int maxEntries_;
- int minEntries_;
+ uint64_t minSize_{0};
+ uint64_t maxSize_{std::numeric_limits<decltype(maxSize_)>::max()};
+ uint32_t maxEntries_{std::numeric_limits<decltype(maxEntries_)>::max()};
+ uint32_t minEntries_{1};
std::string fileCount_;
// Bin Age in msec
- uint64_t binAge_;
+ uint64_t binAge_{std::numeric_limits<decltype(binAge_)>::max()};
std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >groupBinMap_;
std::deque<std::unique_ptr<Bin>> readyBin_;
- int binCount_;
- std::shared_ptr<logging::Logger> logger_;
+ int binCount_{0};
+ std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BinManager>::getLogger()};
};
// BinFiles Class
@@ -219,15 +206,7 @@ class BinFiles : public core::Processor {
protected:
static core::Relationship Self;
public:
- // Constructor
- /*!
- * Create a new processor
- */
- explicit BinFiles(std::string name, utils::Identifier uuid = utils::Identifier())
- : core::Processor(name, uuid),
- logger_(logging::LoggerFactory<BinFiles>::getLogger()) {
- maxBinCount_ = 100;
- }
+ using core::Processor::Processor;
// Destructor
virtual ~BinFiles() = default;
// Processor Name
@@ -239,6 +218,7 @@ class BinFiles : public core::Processor {
static core::Property MaxEntries;
static core::Property MaxBinCount;
static core::Property MaxBinAge;
+ static core::Property BatchSize;
// Supported Relationships
static core::Relationship Failure;
@@ -308,8 +288,9 @@ class BinFiles : public core::Processor {
std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
};
- std::shared_ptr<logging::Logger> logger_;
- int maxBinCount_;
+ std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BinFiles>::getLogger()};
+ uint32_t batchSize_{1};
+ uint32_t maxBinCount_{100};
FlowFileStore file_store_;
};
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 4aa86fd..c0cb715 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -40,16 +40,13 @@ core::Property CompressContent::CompressLevel(
->isRequired(false)->withDefaultValue<int>(1)->build());
core::Property CompressContent::CompressMode(
core::PropertyBuilder::createProperty("Mode")->withDescription("Indicates whether the processor should compress content or decompress content.")
- ->isRequired(false)->withAllowableValues<std::string>({MODE_COMPRESS, MODE_DECOMPRESS})->withDefaultValue(MODE_COMPRESS)->build());
+ ->isRequired(false)->withAllowableValues(CompressionMode::values())
+ ->withDefaultValue(toString(CompressionMode::Compress))->build());
core::Property CompressContent::CompressFormat(
core::PropertyBuilder::createProperty("Compression Format")->withDescription("The compression format to use.")
->isRequired(false)
- ->withAllowableValues<std::string>({
- COMPRESSION_FORMAT_ATTRIBUTE,
- COMPRESSION_FORMAT_GZIP,
- COMPRESSION_FORMAT_BZIP2,
- COMPRESSION_FORMAT_XZ_LZMA2,
- COMPRESSION_FORMAT_LZMA})->withDefaultValue(COMPRESSION_FORMAT_ATTRIBUTE)->build());
+ ->withAllowableValues(ExtendedCompressionFormat::values())
+ ->withDefaultValue(toString(ExtendedCompressionFormat::USE_MIME_TYPE))->build());
core::Property CompressContent::UpdateFileName(
core::PropertyBuilder::createProperty("Update Filename")->withDescription("Determines if filename extension need to be updated")
->isRequired(false)->withDefaultValue<bool>(false)->build());
@@ -60,10 +57,29 @@ core::Property CompressContent::EncapsulateInTar(
"If false, on compression the content of the FlowFile simply gets compressed, and on decompression a simple compressed content is expected.\n"
"true is the behaviour compatible with older MiNiFi C++ versions, false is the behaviour compatible with NiFi.")
->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property CompressContent::BatchSize(
+ core::PropertyBuilder::createProperty("Batch Size")
+ ->withDescription("Maximum number of FlowFiles processed in a single session")
+ ->withDefaultValue<uint32_t>(1)->build());
core::Relationship CompressContent::Success("success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed");
core::Relationship CompressContent::Failure("failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress");
+const std::map<std::string, CompressContent::CompressionFormat> CompressContent::compressionFormatMimeTypeMap_{
+ {"application/gzip", CompressionFormat::GZIP},
+ {"application/bzip2", CompressionFormat::BZIP2},
+ {"application/x-bzip2", CompressionFormat::BZIP2},
+ {"application/x-lzma", CompressionFormat::LZMA},
+ {"application/x-xz", CompressionFormat::XZ_LZMA2}
+};
+
+const std::map<CompressContent::CompressionFormat, std::string> CompressContent::fileExtension_{
+ {CompressionFormat::GZIP, ".gz"},
+ {CompressionFormat::LZMA, ".lzma"},
+ {CompressionFormat::BZIP2, ".bz2"},
+ {CompressionFormat::XZ_LZMA2, ".xz"}
+};
+
void CompressContent::initialize() {
// Set the supported properties
std::set<core::Property> properties;
@@ -72,6 +88,7 @@ void CompressContent::initialize() {
properties.insert(CompressFormat);
properties.insert(UpdateFileName);
properties.insert(EncapsulateInTar);
+ properties.insert(BatchSize);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -81,39 +98,38 @@ void CompressContent::initialize() {
}
void CompressContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
- std::string value;
context->getProperty(CompressLevel.getName(), compressLevel_);
context->getProperty(CompressMode.getName(), compressMode_);
context->getProperty(CompressFormat.getName(), compressFormat_);
context->getProperty(UpdateFileName.getName(), updateFileName_);
context->getProperty(EncapsulateInTar.getName(), encapsulateInTar_);
+ context->getProperty(BatchSize.getName(), batchSize_);
logger_->log_info("Compress Content: Mode [%s] Format [%s] Level [%d] UpdateFileName [%d] EncapsulateInTar [%d]",
- compressMode_, compressFormat_, compressLevel_, updateFileName_, encapsulateInTar_);
-
- // update the mimeTypeMap
- compressionFormatMimeTypeMap_["application/gzip"] = COMPRESSION_FORMAT_GZIP;
- compressionFormatMimeTypeMap_["application/bzip2"] = COMPRESSION_FORMAT_BZIP2;
- compressionFormatMimeTypeMap_["application/x-bzip2"] = COMPRESSION_FORMAT_BZIP2;
- compressionFormatMimeTypeMap_["application/x-lzma"] = COMPRESSION_FORMAT_LZMA;
- compressionFormatMimeTypeMap_["application/x-xz"] = COMPRESSION_FORMAT_XZ_LZMA2;
- fileExtension_[COMPRESSION_FORMAT_GZIP] = ".gz";
- fileExtension_[COMPRESSION_FORMAT_LZMA] = ".lzma";
- fileExtension_[COMPRESSION_FORMAT_BZIP2] = ".bz2";
- fileExtension_[COMPRESSION_FORMAT_XZ_LZMA2] = ".xz";
+ compressMode_.toString(), compressFormat_.toString(), compressLevel_, updateFileName_, encapsulateInTar_);
}
void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- std::shared_ptr<core::FlowFile> flowFile = session->get();
-
- if (!flowFile) {
+ size_t processedFlowFileCount = 0;
+ for (; processedFlowFileCount < batchSize_; ++processedFlowFileCount) {
+ std::shared_ptr<core::FlowFile> flowFile = session->get();
+ if (!flowFile) {
+ break;
+ }
+ processFlowFile(flowFile, session);
+ }
+ if (processedFlowFileCount == 0) {
+ // we got no flowFiles
+ context->yield();
return;
}
+}
+void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session) {
session->remove(flowFile);
- std::string compressFormat = compressFormat_;
- if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
+ CompressionFormat compressFormat;
+ if (compressFormat_ == ExtendedCompressionFormat::USE_MIME_TYPE) {
std::string attr;
flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
if (attr.empty()) {
@@ -129,36 +145,24 @@ void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &con
session->transfer(flowFile, Success);
return;
}
- }
- std::transform(compressFormat.begin(), compressFormat.end(), compressFormat.begin(), ::tolower);
- std::string mimeType;
- if (compressFormat == COMPRESSION_FORMAT_GZIP) {
- mimeType = "application/gzip";
- } else if (compressFormat == COMPRESSION_FORMAT_BZIP2) {
- mimeType = "application/bzip2";
- } else if (compressFormat == COMPRESSION_FORMAT_LZMA) {
- mimeType = "application/x-lzma";
- } else if (compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) {
- mimeType = "application/x-xz";
} else {
- logger_->log_error("Compress format is invalid %s", compressFormat);
- session->transfer(flowFile, Failure);
- return;
+ compressFormat = compressFormat_.cast<CompressionFormat>();
}
+ std::string mimeType = toMimeType(compressFormat);
// Validate
- if (!encapsulateInTar_ && compressFormat != COMPRESSION_FORMAT_GZIP) {
+ if (!encapsulateInTar_ && compressFormat != CompressionFormat::GZIP) {
logger_->log_error("non-TAR encapsulated format only supports GZIP compression");
session->transfer(flowFile, Failure);
return;
}
- if (compressFormat == COMPRESSION_FORMAT_BZIP2 && archive_bzlib_version() == nullptr) {
- logger_->log_error("%s compression format is requested, but the agent was compiled without BZip2 support", compressFormat);
+ if (compressFormat == CompressionFormat::BZIP2 && archive_bzlib_version() == nullptr) {
+ logger_->log_error("%s compression format is requested, but the agent was compiled without BZip2 support", compressFormat.toString());
session->transfer(flowFile, Failure);
return;
}
- if ((compressFormat == COMPRESSION_FORMAT_LZMA || compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) && archive_liblzma_version() == nullptr) {
- logger_->log_error("%s compression format is requested, but the agent was compiled without LZMA support ", compressFormat);
+ if ((compressFormat == CompressionFormat::LZMA || compressFormat == CompressionFormat::XZ_LZMA2) && archive_liblzma_version() == nullptr) {
+ logger_->log_error("%s compression format is requested, but the agent was compiled without LZMA support ", compressFormat.toString());
session->transfer(flowFile, Failure);
return;
}
@@ -168,43 +172,53 @@ void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &con
if (search != fileExtension_.end()) {
fileExtension = search->second;
}
- std::shared_ptr<core::FlowFile> processFlowFile = session->create(flowFile);
+ std::shared_ptr<core::FlowFile> result = session->create(flowFile);
bool success = false;
if (encapsulateInTar_) {
CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
- session->write(processFlowFile, &callback);
+ session->write(result, &callback);
success = callback.status_ >= 0;
} else {
CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
- session->write(processFlowFile, &callback);
+ session->write(result, &callback);
success = callback.success_;
}
if (!success) {
logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr());
session->transfer(flowFile, Failure);
- session->remove(processFlowFile);
+ session->remove(result);
} else {
std::string fileName;
- processFlowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
- if (compressMode_ == MODE_COMPRESS) {
- session->putAttribute(processFlowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
+ result->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
+ if (compressMode_ == CompressionMode::Compress) {
+ session->putAttribute(result, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
if (updateFileName_) {
fileName = fileName + fileExtension;
- session->putAttribute(processFlowFile, core::SpecialFlowAttribute::FILENAME, fileName);
+ session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
}
} else {
- session->removeAttribute(processFlowFile, core::SpecialFlowAttribute::MIME_TYPE);
+ session->removeAttribute(result, core::SpecialFlowAttribute::MIME_TYPE);
if (updateFileName_) {
if (fileName.size() >= fileExtension.size() && fileName.compare(fileName.size() - fileExtension.size(), fileExtension.size(), fileExtension) == 0) {
fileName = fileName.substr(0, fileName.size() - fileExtension.size());
- session->putAttribute(processFlowFile, core::SpecialFlowAttribute::FILENAME, fileName);
+ session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
}
}
}
- logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
- session->transfer(processFlowFile, Success);
+ logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", result->getUUIDStr(), fileName);
+ session->transfer(result, Success);
+ }
+}
+
+std::string CompressContent::toMimeType(CompressionFormat format) {
+ switch (format.value()) {
+ case CompressionFormat::GZIP: return "application/gzip";
+ case CompressionFormat::BZIP2: return "application/bzip2";
+ case CompressionFormat::LZMA: return "application/x-lzma";
+ case CompressionFormat::XZ_LZMA2: return "application/x-xz";
}
+ throw Exception(GENERAL_EXCEPTION, "Invalid compression format");
}
} /* namespace processors */
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 8cda1d6..3f87685 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -33,6 +33,7 @@
#include "core/Property.h"
#include "core/logging/LoggerConfiguration.h"
#include "io/ZlibStream.h"
+#include "utils/Enum.h"
namespace org {
namespace apache {
@@ -40,15 +41,6 @@ namespace nifi {
namespace minifi {
namespace processors {
-#define COMPRESSION_FORMAT_ATTRIBUTE "use mime.type attribute"
-#define COMPRESSION_FORMAT_GZIP "gzip"
-#define COMPRESSION_FORMAT_BZIP2 "bzip2"
-#define COMPRESSION_FORMAT_XZ_LZMA2 "xz-lzma2"
-#define COMPRESSION_FORMAT_LZMA "lzma"
-
-#define MODE_COMPRESS "compress"
-#define MODE_DECOMPRESS "decompress"
-
// CompressContent Class
class CompressContent: public core::Processor {
public:
@@ -72,11 +64,28 @@ public:
static core::Property CompressFormat;
static core::Property UpdateFileName;
static core::Property EncapsulateInTar;
+ static core::Property BatchSize;
// Supported Relationships
static core::Relationship Failure;
static core::Relationship Success;
+ SMART_ENUM(CompressionMode,
+ (Compress, "compress"),
+ (Decompress, "decompress")
+ )
+
+ SMART_ENUM(CompressionFormat,
+ (GZIP, "gzip"),
+ (LZMA, "lzma"),
+ (XZ_LZMA2, "xz-lzma2"),
+ (BZIP2, "bzip2")
+ )
+
+ SMART_ENUM_EXTEND(ExtendedCompressionFormat, CompressionFormat, (GZIP, LZMA, XZ_LZMA2, BZIP2),
+ (USE_MIME_TYPE, "use mime.type attribute")
+ )
+
public:
// Nest Callback Class for read stream from flow for compress
class ReadCallbackCompress: public InputStreamCallback {
@@ -125,7 +134,7 @@ public:
// Nest Callback Class for read stream from flow for decompress
class ReadCallbackDecompress: public InputStreamCallback {
public:
- ReadCallbackDecompress(std::shared_ptr<core::FlowFile> &flow) :
+ ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
read_size_(0), offset_(0), flow_(flow) {
origin_offset_ = flow_->getOffset();
}
@@ -149,8 +158,8 @@ public:
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
public:
- WriteCallback(std::string &compress_mode, int compress_level, std::string &compress_format,
- std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
+ WriteCallback(CompressionMode compress_mode, int compress_level, CompressionFormat compress_format,
+ const std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format),
flow_(flow), session_(session),
logger_(logging::LoggerFactory<CompressContent>::getLogger()),
@@ -161,9 +170,9 @@ public:
}
~WriteCallback() = default;
- std::string compress_mode_;
+ CompressionMode compress_mode_;
int compress_level_;
- std::string compress_format_;
+ CompressionFormat compress_format_;
std::shared_ptr<core::FlowFile> flow_;
std::shared_ptr<core::ProcessSession> session_;
std::shared_ptr<io::BaseStream> stream_;
@@ -212,7 +221,7 @@ public:
struct archive *arch;
int r;
- if (compress_mode_ == MODE_COMPRESS) {
+ if (compress_mode_ == CompressionMode::Compress) {
arch = archive_write_new();
if (!arch) {
status_ = -1;
@@ -223,7 +232,7 @@ public:
archive_write_log_error_cleanup(arch);
return -1;
}
- if (compress_format_ == COMPRESSION_FORMAT_GZIP) {
+ if (compress_format_ == CompressionFormat::GZIP) {
r = archive_write_add_filter_gzip(arch);
if (r != ARCHIVE_OK) {
archive_write_log_error_cleanup(arch);
@@ -236,19 +245,19 @@ public:
archive_write_log_error_cleanup(arch);
return -1;
}
- } else if (compress_format_ == COMPRESSION_FORMAT_BZIP2) {
+ } else if (compress_format_ == CompressionFormat::BZIP2) {
r = archive_write_add_filter_bzip2(arch);
if (r != ARCHIVE_OK) {
archive_write_log_error_cleanup(arch);
return -1;
}
- } else if (compress_format_ == COMPRESSION_FORMAT_LZMA) {
+ } else if (compress_format_ == CompressionFormat::LZMA) {
r = archive_write_add_filter_lzma(arch);
if (r != ARCHIVE_OK) {
archive_write_log_error_cleanup(arch);
return -1;
}
- } else if (compress_format_ == COMPRESSION_FORMAT_XZ_LZMA2) {
+ } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
r = archive_write_add_filter_xz(arch);
if (r != ARCHIVE_OK) {
archive_write_log_error_cleanup(arch);
@@ -346,7 +355,7 @@ public:
class GzipWriteCallback : public OutputStreamCallback {
public:
- GzipWriteCallback(std::string compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
+ GzipWriteCallback(CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
: logger_(logging::LoggerFactory<CompressContent>::getLogger())
, compress_mode_(std::move(compress_mode))
, compress_level_(compress_level)
@@ -355,7 +364,7 @@ public:
}
std::shared_ptr<logging::Logger> logger_;
- std::string compress_mode_;
+ CompressionMode compress_mode_;
int compress_level_;
std::shared_ptr<core::FlowFile> flow_;
std::shared_ptr<core::ProcessSession> session_;
@@ -394,7 +403,7 @@ public:
};
std::shared_ptr<io::ZlibBaseStream> filterStream;
- if (compress_mode_ == MODE_COMPRESS) {
+ if (compress_mode_ == CompressionMode::Compress) {
filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
} else {
filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP);
@@ -424,17 +433,20 @@ public:
// Initialize, over write by NiFi CompressContent
virtual void initialize(void);
-protected:
-
private:
+ static std::string toMimeType(CompressionFormat format);
+
+ void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
+
std::shared_ptr<logging::Logger> logger_;
int compressLevel_;
- std::string compressMode_;
- std::string compressFormat_;
+ CompressionMode compressMode_;
+ ExtendedCompressionFormat compressFormat_;
bool updateFileName_;
bool encapsulateInTar_;
- std::map<std::string, std::string> compressionFormatMimeTypeMap_;
- std::map<std::string, std::string> fileExtension_;
+ uint32_t batchSize_{1};
+ static const std::map<std::string, CompressionFormat> compressionFormatMimeTypeMap_;
+ static const std::map<CompressionFormat, std::string> fileExtension_;
};
REGISTER_RESOURCE(CompressContent, "Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate");
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index e702eb5..ecf0407 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -89,6 +89,7 @@ void MergeContent::initialize() {
properties.insert(MaxEntries);
properties.insert(MaxBinAge);
properties.insert(MaxBinCount);
+ properties.insert(BatchSize);
properties.insert(MergeStrategy);
properties.insert(MergeFormat);
properties.insert(CorrelationAttributeName);
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 692c39f..d271d5b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -64,7 +64,7 @@ core::Property ListenSyslog::Protocol(
"UDP")->build());
core::Property ListenSyslog::Port(
- core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::PORT_VALIDATOR())->build());
+ core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build());
core::Relationship ListenSyslog::Success("success", "All files are routed to success");
core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
diff --git a/extensions/standard-processors/processors/RetryFlowFile.cpp b/extensions/standard-processors/processors/RetryFlowFile.cpp
index de235e8..cd88e45 100644
--- a/extensions/standard-processors/processors/RetryFlowFile.cpp
+++ b/extensions/standard-processors/processors/RetryFlowFile.cpp
@@ -31,7 +31,7 @@ core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProper
"The name of the attribute that contains the current retry count for the FlowFile."
"WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
"the processor will either overwrite that attribute with '1' or fail based on configuration.")
- ->withDefaultValue("flowfile.retries", core::StandardValidators::NON_BLANK_VALIDATOR())
+ ->withDefaultValue("flowfile.retries", core::StandardValidators::get().NON_BLANK_VALIDATOR)
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
diff --git a/libminifi/include/core/CachedValueValidator.h b/libminifi/include/core/CachedValueValidator.h
index 1076027..9102248 100644
--- a/libminifi/include/core/CachedValueValidator.h
+++ b/libminifi/include/core/CachedValueValidator.h
@@ -112,7 +112,7 @@ class CachedValueValidator {
validation_result_ = Result::RECOMPUTE;
}
- gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::VALID_VALIDATOR()};
+ gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::get().VALID_VALIDATOR};
mutable Result validation_result_{Result::RECOMPUTE};
};
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index c429570..60ddafc 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -109,6 +109,8 @@ class ProcessSession : public ReferenceContainer {
* @param flow flow file
*/
void importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow);
+ void importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow);
+
// import from the data source.
void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0);
DEPRECATED(/*deprecated in*/ 0.7.0, /*will remove in */ 2.0) void import(std::string source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter); // NOLINT
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index d832da4..44719bd 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -355,7 +355,7 @@ class Property {
validator_ = StandardValidators::getValidator(ret.getValue());
} else {
ret = value;
- validator_ = StandardValidators::VALID_VALIDATOR();
+ validator_ = StandardValidators::get().VALID_VALIDATOR;
}
return ret;
}
@@ -369,7 +369,7 @@ class Property {
bool is_collection_;
PropertyValue default_value_;
std::vector<PropertyValue> values_;
- gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::VALID_VALIDATOR()};
+ gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::get().VALID_VALIDATOR};
std::string display_name_;
std::vector<PropertyValue> allowed_values_;
// types represents the allowable types for this property
@@ -432,6 +432,12 @@ class PropertyBuilder : public std::enable_shared_from_this<PropertyBuilder> {
return shared_from_this();
}
+ std::shared_ptr<PropertyBuilder> withType(const std::shared_ptr<PropertyValidator> &validator) {
+ prop.validator_ = gsl::make_not_null(validator);
+ prop.default_value_.setValidator(gsl::make_not_null(validator));
+ return shared_from_this();
+ }
+
template<typename T>
std::shared_ptr<ConstrainedProperty<T>> withAllowableValue(const T& df) {
auto property = std::make_shared<ConstrainedProperty<T>>(shared_from_this());
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index 800a3f5..3583749 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -330,9 +330,15 @@ class TimePeriodValidator : public PropertyValidator {
// STATIC DEFINITIONS
class StandardValidators {
+ StandardValidators();
+
public:
- static const gsl::not_null<std::shared_ptr<PropertyValidator>> &getValidator(const std::shared_ptr<minifi::state::response::Value> &input) {
+ static const StandardValidators& get() {
static StandardValidators init;
+ return init;
+ }
+ static const gsl::not_null<std::shared_ptr<PropertyValidator>> &getValidator(const std::shared_ptr<minifi::state::response::Value> &input) {
+ const StandardValidators& init = get();
if (std::dynamic_pointer_cast<core::DataSizeValue>(input) != nullptr) {
return init.DATA_SIZE_VALIDATOR;
} else if (std::dynamic_pointer_cast<core::TimePeriodValue>(input) != nullptr) {
@@ -348,31 +354,11 @@ class StandardValidators {
} else if (std::dynamic_pointer_cast<minifi::state::response::UInt64Value>(input) != nullptr) {
return init.UNSIGNED_LONG_VALIDATOR;
} else {
- return org::apache::nifi::minifi::core::StandardValidators::VALID_VALIDATOR();
+ return init.VALID_VALIDATOR;
}
}
- static const gsl::not_null<std::shared_ptr<PropertyValidator>>& NON_BLANK_VALIDATOR() {
- static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<NonBlankValidator>("NON_BLANK_VALIDATOR"));
- return validator;
- }
-
- static const gsl::not_null<std::shared_ptr<PropertyValidator>>& VALID_VALIDATOR() {
- static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<AlwaysValid>(true, "VALID"));
- return validator;
- }
-
- static gsl::not_null<std::shared_ptr<PropertyValidator>> PORT_VALIDATOR() {
- static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<PortValidator>("PORT_VALIDATOR"));
- return validator;
- }
-
- static gsl::not_null<std::shared_ptr<PropertyValidator>> LISTEN_PORT_VALIDATOR() {
- static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<ListenPortValidator>("PORT_VALIDATOR"));
- return validator;
- }
-
- private:
+ public:
gsl::not_null<std::shared_ptr<PropertyValidator>> INVALID;
gsl::not_null<std::shared_ptr<PropertyValidator>> INTEGER_VALIDATOR;
gsl::not_null<std::shared_ptr<PropertyValidator>> UNSIGNED_INT_VALIDATOR;
@@ -382,7 +368,10 @@ class StandardValidators {
gsl::not_null<std::shared_ptr<PropertyValidator>> DATA_SIZE_VALIDATOR;
gsl::not_null<std::shared_ptr<PropertyValidator>> TIME_PERIOD_VALIDATOR;
- StandardValidators();
+ gsl::not_null<std::shared_ptr<PropertyValidator>> NON_BLANK_VALIDATOR;
+ gsl::not_null<std::shared_ptr<PropertyValidator>> VALID_VALIDATOR;
+ gsl::not_null<std::shared_ptr<PropertyValidator>> PORT_VALIDATOR;
+ gsl::not_null<std::shared_ptr<PropertyValidator>> LISTEN_PORT_VALIDATOR;
};
} // namespace core
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 34b0430..c393abb 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -21,6 +21,7 @@
#include <iostream>
#include <cstdint>
#include <vector>
+#include <string>
#include "BaseStream.h"
namespace org {
@@ -37,6 +38,10 @@ class BufferStream : public BaseStream {
write(buf, len);
}
+ explicit BufferStream(const std::string& data) {
+ write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+ }
+
using BaseStream::read;
using BaseStream::write;
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
new file mode 100644
index 0000000..440b158
--- /dev/null
+++ b/libminifi/include/utils/Enum.h
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <cstring>
+#include <set>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+#define COMMA(...) ,
+#define MSVC_HACK(x) x
+
+#define PICK_(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, ...) _15
+#define COUNT(...) \
+ MSVC_HACK(PICK_(__VA_ARGS__, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0))
+
+#define CONCAT_(a, b) a ## b
+#define CONCAT(a, b) CONCAT_(a, b)
+
+#define CALL(Fn, ...) MSVC_HACK(Fn(__VA_ARGS__))
+#define SPREAD(...) __VA_ARGS__
+
+#define FOR_EACH(fn, delim, ARGS) \
+ CALL(CONCAT(FOR_EACH_, COUNT ARGS), fn, delim, SPREAD ARGS)
+
+#define FOR_EACH_0(...)
+#define FOR_EACH_1(fn, delim, _1) fn(_1)
+#define FOR_EACH_2(fn, delim, _1, _2) fn(_1) delim() fn(_2)
+#define FOR_EACH_3(fn, delim, _1, _2, _3) fn(_1) delim() fn(_2) delim() fn(_3)
+#define FOR_EACH_4(fn, delim, _1, _2, _3, _4) \
+ fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4)
+#define FOR_EACH_5(fn, delim, _1, _2, _3, _4, _5) \
+ fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() fn(_5)
+#define FOR_EACH_6(fn, delim, _1, _2, _3, _4, _5, _6) \
+ fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+ fn(_5) delim() fn(_6)
+#define FOR_EACH_7(fn, delim, _1, _2, _3, _4, _5, _6, _7) \
+ fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+ fn(_5) delim() fn(_6) delim() fn(_7)
+#define FOR_EACH_8(fn, delim, _1, _2, _3, _4, _5, _6, _7, _8) \
+ fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+ fn(_5) delim() fn(_6) delim() fn(_7) delim() fn(_8)
+
+#define FIRST_(a, b) a
+#define FIRST(x, ...) FIRST_ x
+#define SECOND_(a, b) b
+#define SECOND(x, ...) SECOND_ x
+#define NOTHING()
+
+#define INCLUDE_BASE_FIELD(x) \
+ x = Base::x
+
+#define SMART_ENUM_BODY(Clazz, ...) \
+ constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
+ Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+ Clazz(const char* str) : value_{parse(str).value_} {} \
+ private: \
+ Type value_; \
+ public: \
+ Type value() const { \
+ return value_; \
+ } \
+ struct detail : Base::detail { \
+ static std::set<std::string> values() { \
+ static constexpr const char* ownValues[]{ \
+ FOR_EACH(SECOND, COMMA, (__VA_ARGS__)) \
+ }; \
+ std::set<std::string> values = Base::detail::values(); \
+ for (auto value : ownValues) { \
+ values.emplace(value); \
+ } \
+ return values; \
+ } \
+ static const char* toStringImpl(Type a, const char* DerivedName) { \
+ static constexpr const char* values[]{ \
+ FOR_EACH(SECOND, COMMA, (__VA_ARGS__)) \
+ }; \
+ int index = static_cast<int>(a); \
+ if (Base::length <= index && index < length) { \
+ return values[index - Base::length]; \
+ } \
+ return Base::detail::toStringImpl(static_cast<Base::Type>(a), DerivedName); \
+ } \
+ }; \
+ static constexpr int length = Base::length + COUNT(__VA_ARGS__); \
+ friend const char* toString(Type a) { \
+ return detail::toStringImpl(a, #Clazz); \
+ } \
+ const char* toString() const { \
+ return detail::toStringImpl(value_, #Clazz); \
+ } \
+ static std::set<std::string> values() { \
+ return detail::values(); \
+ } \
+ bool operator==(Type val) const { \
+ return value_ == val; \
+ } \
+ bool operator!=(Type val) const { \
+ return value_ != val; \
+ } \
+ bool operator==(const Clazz& other) const { \
+ return value_ == other.value_; \
+ } \
+ bool operator!=(const Clazz& other) const { \
+ return value_ != other.value_; \
+ } \
+ bool operator<(const Clazz& other) const { \
+ return value_ < other.value_;\
+ } \
+ explicit operator bool() const { \
+ int idx = static_cast<int>(value_); \
+ return 0 <= idx && idx < length; \
+ } \
+ static Clazz parse(const char* str, bool throw_on_invalid = true) { \
+ for (int idx = 0; idx < length; ++idx) { \
+ if (std::strcmp(str, detail::toStringImpl(static_cast<Type>(idx), #Clazz)) == 0) \
+ return static_cast<Type>(idx); \
+ } \
+ if (throw_on_invalid) { \
+ throw std::runtime_error(std::string("Cannot convert \"") + str + "\" to " #Clazz); \
+ } \
+ return {}; \
+ } \
+ template<typename T, typename = typename std::enable_if<std::is_base_of<typename T::detail, detail>::value>::type> \
+ T cast() const { \
+ if (0 <= value_ && value_ < T::length) { \
+ return static_cast<typename T::Type>(value_); \
+ } \
+ return {}; \
+ }
+
+/**
+ * These macros provide an encapsulation of enum-like behavior offering the following:
+ * - switch safety: the compiler can detect if some enum cases are not handled
+ * - string conversion: convert between enum instances and their string representations (toString, parse)
+ * - validity: check if it contains a value that is an invalid enum value
+ * - extensibility: extend an enum with new values and safely* cast from derived to base
+ * (* "safely" here means that there is no casting between unrelated enums)
+ * - reflection: access the set of all string representations
+ */
+
+#define SMART_ENUM(Clazz, ...) \
+ struct Clazz { \
+ using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
+ enum Type { \
+ FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
+ }; \
+ SMART_ENUM_BODY(Clazz, __VA_ARGS__) \
+ };
+
+#define SMART_ENUM_EXTEND(Clazz, base, base_fields, ...) \
+ struct Clazz { \
+ using Base = base; \
+ enum Type { \
+ FOR_EACH(INCLUDE_BASE_FIELD, COMMA, base_fields), \
+ FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
+ }; \
+ static_assert((COUNT base_fields) == Base::length, "Must enumerate all base instance values"); \
+ SMART_ENUM_BODY(Clazz, __VA_ARGS__) \
+ };
+
+struct EnumBase {
+ enum Type {};
+ static constexpr int length = 0;
+ struct detail {
+ static std::set<std::string> values() {
+ return {};
+ }
+ static const char* toStringImpl(Type a, const char* DerivedName) {
+ throw std::runtime_error(std::string("Cannot stringify unknown instance in enum \"") + DerivedName + "\" : \""
+ + std::to_string(static_cast<int>(a)) + "\"");
+ }
+ };
+};
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 1a373b5..2143247 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -331,6 +331,23 @@ class StringUtils {
static bool from_hex(uint8_t ch, uint8_t& output);
/**
+ * Creates a string that is a concatenation of count instances of the provided string.
+ * @tparam TChar char type of the string (char or wchar_t)
+ * @param str that is to be repeated
+ * @param count the number of times the string is repeated
+ * @return the result string
+ */
+ template<typename TChar>
+ static std::basic_string<TChar> repeat(const TChar* str, size_t count) {
+ return repeat(std::basic_string<TChar>(str), count);
+ }
+
+ template<typename TChar>
+ static std::basic_string<TChar> repeat(const std::basic_string<TChar>& str, size_t count) {
+ return join("", std::vector<std::basic_string<TChar>>(count, str));
+ }
+
+ /**
* Hexdecodes the hexencoded string in data, ignoring every character that is not [0-9a-fA-F]
* @param data the output buffer where the hexdecoded bytes will be written. Must be at least length / 2 bytes long.
* @param data_length pointer to the length of data the data buffer. It will be filled with the length of the decoded bytes.
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dc002f0..6a8273a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -315,6 +315,9 @@ int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStrea
}
}
+void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) {
+ importFrom(stream, flow);
+}
/**
* Imports a file from the data stream
* @param stream incoming data stream that contains the data to store into a file
diff --git a/libminifi/src/core/PropertyValidation.cpp b/libminifi/src/core/PropertyValidation.cpp
index 304aa39..1be431c 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -33,7 +33,11 @@ StandardValidators::StandardValidators()
UNSIGNED_LONG_VALIDATOR(std::make_shared<UnsignedLongValidator>("LONG_VALIDATOR")),
DATA_SIZE_VALIDATOR(std::make_shared<DataSizeValidator>("DATA_SIZE_VALIDATOR")),
TIME_PERIOD_VALIDATOR(std::make_shared<TimePeriodValidator>("TIME_PERIOD_VALIDATOR")),
- BOOLEAN_VALIDATOR(std::make_shared<BooleanValidator>("BOOLEAN_VALIDATOR")) {}
+ BOOLEAN_VALIDATOR(std::make_shared<BooleanValidator>("BOOLEAN_VALIDATOR")),
+ NON_BLANK_VALIDATOR(std::make_shared<NonBlankValidator>("NON_BLANK_VALIDATOR")),
+ VALID_VALIDATOR(std::make_shared<AlwaysValid>(true, "VALID")),
+ PORT_VALIDATOR(std::make_shared<PortValidator>("PORT_VALIDATOR")),
+ LISTEN_PORT_VALIDATOR(std::make_shared<ListenPortValidator>("PORT_VALIDATOR")) {}
} /* namespace core */
} /* namespace minifi */
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 9796bbd..228a135 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_TEST_UTILS_H_
-#define LIBMINIFI_TEST_UTILS_H_
+#pragma once
+
+#include <string>
#define FIELD_ACCESSOR(field) \
template<typename T> \
@@ -28,5 +29,3 @@
static auto call_##method(T&& instance, Args&& ...args) -> decltype((std::forward<T>(instance).method(std::forward<Args>(args)...))) { \
return std::forward<T>(instance).method(std::forward<Args>(args)...); \
}
-
-#endif // LIBMINIFI_TEST_UTILS_H_
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 446b71b..76507df 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -40,8 +40,9 @@
#include "processors/LogAttribute.h"
#include "processors/PutFile.h"
#include "utils/file/FileUtils.h"
+#include "../Utils.h"
-class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+class ReadCallback: public minifi::InputStreamCallback {
public:
explicit ReadCallback(size_t size) :
read_size_(0) {
@@ -56,7 +57,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
if (archive_buffer_)
delete[] archive_buffer_;
}
- int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
int64_t total_read = 0;
int64_t ret = 0;
do {
@@ -108,24 +109,24 @@ class CompressDecompressionTestController : public TestController{
}
void setupFlow() {
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<processors::CompressContent>();
+ LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::ProcessSession>();
LogTestController::getInstance().setTrace<core::ProcessContext>();
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+ LogTestController::getInstance().setTrace<minifi::Connection>();
+ LogTestController::getInstance().setTrace<minifi::core::Connectable>();
+ LogTestController::getInstance().setTrace<minifi::io::FileStream>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+ processor = std::make_shared<processors::CompressContent>("compresscontent");
processor->initialize();
utils::Identifier processoruuid = processor->getUUID();
REQUIRE(processoruuid);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ content_repo->initialize(std::make_shared<minifi::Configure>());
// connection from compress processor to log attribute
output = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
output->addRelationship(core::Relationship("success", "compress successful output"));
@@ -206,8 +207,8 @@ class CompressTestController : public CompressDecompressionTestController {
public:
CompressTestController() {
- char format[] = "/tmp/test.XXXXXX";
- tempDir_ = get_global_controller().createTempDirectory(format);
+ char CompressionFormat[] = "/tmp/test.XXXXXX";
+ tempDir_ = get_global_controller().createTempDirectory(CompressionFormat);
REQUIRE(!tempDir_.empty());
raw_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-expect-compresscontent.txt");
compressed_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-compresscontent");
@@ -234,11 +235,14 @@ class DecompressTestController : public CompressDecompressionTestController{
}
};
+using CompressionFormat = processors::CompressContent::ExtendedCompressionFormat;
+using CompressionMode = processors::CompressContent::CompressionMode;
+
TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -260,7 +264,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+ flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/gzip");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
@@ -273,10 +277,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1
}
TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfiletest2]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -298,7 +302,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+ REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -307,10 +311,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet
}
TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::BZIP2));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -332,7 +336,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+ flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/bzip2");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
@@ -346,10 +350,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3
TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfiletest4]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::BZIP2));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -371,7 +375,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+ REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -380,10 +384,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet
}
TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::LZMA));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -411,7 +415,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+ flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/x-lzma");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
@@ -425,15 +429,15 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5
TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfiletest6]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::USE_MIME_TYPE));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
sessionGenFlowFile.import(compressedPath(), flow, true, 0);
- flow->setAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma");
+ flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma");
sessionGenFlowFile.flushContent();
input->put(flow);
@@ -457,7 +461,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+ REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -466,10 +470,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet
}
TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletest7]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::XZ_LZMA2));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -497,7 +501,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+ flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/x-xz");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
@@ -511,15 +515,15 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes
TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfiletest8]") {
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::USE_MIME_TYPE));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
sessionGenFlowFile.import(compressedPath(), flow, true, 0);
- flow->setAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz");
+ flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz");
sessionGenFlowFile.flushContent();
input->put(flow);
@@ -543,7 +547,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil
{
REQUIRE(flow1->getSize() != flow->getSize());
std::string mime;
- REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+ REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -552,8 +556,8 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil
}
TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfiletest8]") {
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
+ LogTestController::getInstance().setTrace<processors::CompressContent>();
+ LogTestController::getInstance().setTrace<processors::PutFile>();
// Create temporary directories
char format_src[] = "/tmp/archives.XXXXXX";
@@ -599,8 +603,8 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
plan->setProperty(get_file, "Input Directory", src_dir);
// Configure CompressContent processor for compression
- plan->setProperty(compress_content, "Mode", MODE_COMPRESS);
- plan->setProperty(compress_content, "Compression Format", COMPRESSION_FORMAT_GZIP);
+ plan->setProperty(compress_content, "Mode", toString(CompressionMode::Compress));
+ plan->setProperty(compress_content, "Compression Format", toString(CompressionFormat::GZIP));
plan->setProperty(compress_content, "Update Filename", "true");
plan->setProperty(compress_content, "Encapsulate in TAR", "false");
@@ -608,8 +612,8 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
plan->setProperty(put_compressed, "Directory", dst_dir);
// Configure CompressContent processor for decompression
- plan->setProperty(decompress_content, "Mode", MODE_DECOMPRESS);
- plan->setProperty(decompress_content, "Compression Format", COMPRESSION_FORMAT_GZIP);
+ plan->setProperty(decompress_content, "Mode", toString(CompressionMode::Decompress));
+ plan->setProperty(decompress_content, "Compression Format", toString(CompressionFormat::GZIP));
plan->setProperty(decompress_content, "Update Filename", "true");
plan->setProperty(decompress_content, "Encapsulate in TAR", "false");
@@ -653,3 +657,69 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
LogTestController::getInstance().reset();
}
+
+TEST_CASE_METHOD(CompressTestController, "Batch CompressFileGZip", "[compressFileBatchTest]") {
+ std::vector<std::string> flowFileContents{
+ utils::StringUtils::repeat("0", 1000), utils::StringUtils::repeat("1", 1000),
+ utils::StringUtils::repeat("2", 1000), utils::StringUtils::repeat("3", 1000),
+ };
+ const std::size_t batchSize = 3;
+
+ context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+ context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+ context->setProperty(processors::CompressContent::CompressLevel, "9");
+ context->setProperty(processors::CompressContent::UpdateFileName, "true");
+ context->setProperty(processors::CompressContent::BatchSize, std::to_string(batchSize));
+
+
+ core::ProcessSession sessionGenFlowFile(context);
+ for (const auto& content : flowFileContents) {
+ auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(content), flow);
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
+ }
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+
+ // Trigger once to process batchSize
+ {
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+ }
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::vector<std::shared_ptr<core::FlowFile>> outFiles;
+ while (std::shared_ptr<core::FlowFile> file = output->poll(expiredFlowRecords)) {
+ outFiles.push_back(std::move(file));
+ }
+ REQUIRE(outFiles.size() == batchSize);
+
+ // Trigger a second time to process the remaining files
+ {
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+ }
+
+ while (std::shared_ptr<core::FlowFile> file = output->poll(expiredFlowRecords)) {
+ outFiles.push_back(std::move(file));
+ }
+ REQUIRE(outFiles.size() == flowFileContents.size());
+
+ for (std::size_t idx = 0; idx < outFiles.size(); ++idx) {
+ auto file = outFiles[idx];
+ std::string mime;
+ file->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
+ REQUIRE(mime == "application/gzip");
+ ReadCallback callback(gsl::narrow<size_t>(file->getSize()));
+ sessionGenFlowFile.read(file, &callback);
+ callback.archive_read();
+ std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+ REQUIRE(flowFileContents[idx] == content);
+ }
+}
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index d8d2bed..384f9f1 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -38,6 +38,7 @@
#include "../unit/ProvenanceTestHelper.h"
#include "serialization/FlowFileV3Serializer.h"
#include "serialization/PayloadSerializer.h"
+#include "../Utils.h"
std::string FLOW_FILE;
std::string EXPECT_MERGE_CONTENT_FIRST;
@@ -63,7 +64,7 @@ void init_file_paths() {
static Initializer initializer;
}
-class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
+class FixedBuffer : public minifi::InputStreamCallback {
public:
explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
buf_.reset(new uint8_t[capacity_]);
@@ -95,7 +96,7 @@ class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
} while (size_ != capacity_);
return total_read;
}
- int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
return write(*stream.get(), capacity_);
}
@@ -182,11 +183,16 @@ class MergeTestController : public TestController {
logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
+
+ for (size_t i = 0; i < 6; ++i) {
+ flowFileContents[i] = utils::StringUtils::repeat(std::to_string(i), 32);
+ }
}
~MergeTestController() {
LogTestController::getInstance().reset();
}
+ std::string flowFileContents[6];
std::shared_ptr<core::ProcessContext> context;
std::shared_ptr<core::Processor> processor;
std::shared_ptr<minifi::Connection> input;
@@ -194,34 +200,20 @@ class MergeTestController : public TestController {
};
TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- }
+ std::string expected[2]{
+ flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+ flowFileContents[3] + flowFileContents[4] + flowFileContents[5]
+ };
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
core::ProcessSession sessionGenFlowFile(context);
- // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ // Generate 6 flowfiles, first three merged to one, second three merged to one
for (const int i : {0, 2, 5, 4, 1, 3}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
// three bundle
if (i < 3)
flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -251,67 +243,38 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]")
{
FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
- std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[0]);
}
REQUIRE(flow2->getSize() == 96);
{
FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
sessionGenFlowFile.read(flow2, &callback);
- std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[1]);
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefiletest2]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::ofstream headerfile(HEADER_FILE, std::ios::binary);
- std::ofstream footerfile(FOOTER_FILE, std::ios::binary);
- std::ofstream demarcatorfile(DEMARCATOR_FILE, std::ios::binary);
- headerfile << "header";
- expectfileFirst << "header";
- expectfileSecond << "header";
- footerfile << "footer";
- demarcatorfile << "demarcator";
-
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- if (i != 0 && i <= 2)
- expectfileFirst << "demarcator";
- if (i != 3 && i >= 4)
- expectfileSecond << "demarcator";
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- expectfileFirst << "footer";
- expectfileSecond << "footer";
- }
+ std::string expected[2]{
+ "header" + flowFileContents[0] + "demarcator" + flowFileContents[1] + "demarcator" + flowFileContents[2] + "footer",
+ "header" + flowFileContents[3] + "demarcator" + flowFileContents[4] + "demarcator" + flowFileContents[5] + "footer"
+ };
+
+ std::ofstream(HEADER_FILE, std::ios::binary) << "header";
+ std::ofstream(FOOTER_FILE, std::ios::binary) << "footer";
+ std::ofstream(DEMARCATOR_FILE, std::ios::binary) << "demarcator";
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
+ context->setProperty(processors::MergeContent::Header, HEADER_FILE);
+ context->setProperty(processors::MergeContent::Footer, FOOTER_FILE);
+ context->setProperty(processors::MergeContent::Demarcator, DEMARCATOR_FILE);
core::ProcessSession sessionGenFlowFile(context);
- // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+ // Generate 6 flowfiles, first three merged to one, second three merged to one
for (const int i : {0, 2, 5, 4, 1, 3}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
// three bundle
if (i < 3)
flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -342,52 +305,33 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefil
{
FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
- std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[0]);
}
REQUIRE(flow2->getSize() == 128);
{
FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
sessionGenFlowFile.read(flow2, &callback);
- std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[1]);
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefiletest3]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file, drop record 4
- for (int i = 0; i < 6; i++) {
- if (i == 4)
- continue;
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- }
+ // drop record 4
+ std::string expected[2]{
+ flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+ flowFileContents[3] + flowFileContents[5]
+ };
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MaxBinAge, "1 sec");
core::ProcessSession sessionGenFlowFile(context);
// Generate 5 flowfiles, first threes merged to one, the other two merged to one
for (const int i : {0, 2, 5, 1, 3}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
// three bundle
if (i < 3)
flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -424,51 +368,33 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefile
{
FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
- std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[0]);
}
REQUIRE(flow2->getSize() == 64);
{
FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
sessionGenFlowFile.read(flow2, &callback);
- std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[1]);
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- }
+ std::string expected[2]{
+ flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+ flowFileContents[3] + flowFileContents[4] + flowFileContents[5]
+ };
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MinSize, "96");
+ context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
for (const int i : {0, 1, 2, 3, 4, 5}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
flow->setAttribute("tag", "tag");
sessionGenFlowFile.flushContent();
input->put(flow);
@@ -490,52 +416,29 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
{
FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
- std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[0]);
}
REQUIRE(flow2->getSize() == 96);
{
FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
sessionGenFlowFile.read(flow2, &callback);
- std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[1]);
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- }
-
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MinSize, "96");
+ context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
for (const int i : {0, 1, 2, 3, 4, 5}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
flow->setAttribute("tag", "tag");
sessionGenFlowFile.flushContent();
input->put(flow);
@@ -560,10 +463,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
auto archives = read_archives(callback);
REQUIRE(archives.size() == 3);
for (int i = 0; i < 3; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ifstream file1(flowFileName, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(archives[i].to_string() == contents);
+ REQUIRE(archives[i].to_string() == flowFileContents[i]);
}
}
REQUIRE(flow2->getSize() > 0);
@@ -573,45 +473,23 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
auto archives = read_archives(callback);
REQUIRE(archives.size() == 3);
for (int i = 3; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ifstream file1(flowFileName, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(archives[i-3].to_string() == contents);
+ REQUIRE(archives[i-3].to_string() == flowFileContents[i]);
}
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- if (i < 3)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
- }
-
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MinSize, "96");
+ context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
for (const int i : {0, 1, 2, 3, 4, 5}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
flow->setAttribute("tag", "tag");
sessionGenFlowFile.flushContent();
input->put(flow);
@@ -636,10 +514,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
auto archives = read_archives(callback);
REQUIRE(archives.size() == 3);
for (int i = 0; i < 3; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ifstream file1(flowFileName, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(archives[i].to_string() == contents);
+ REQUIRE(archives[i].to_string() == flowFileContents[i]);
}
}
REQUIRE(flow2->getSize() > 0);
@@ -649,41 +524,28 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
auto archives = read_archives(callback);
REQUIRE(archives.size() == 3);
for (int i = 3; i < 6; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ifstream file1(flowFileName, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(archives[i-3].to_string() == contents);
+ REQUIRE(archives[i-3].to_string() == flowFileContents[i]);
}
}
}
TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 6; i++) {
- std::ofstream{std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt", std::ios::binary} << std::to_string(i);
- if (i % 2 == 0)
- expectfileFirst << std::to_string(i);
- else
- expectfileSecond << std::to_string(i);
- }
- }
+ std::string expected[2]{
+ flowFileContents[0] + flowFileContents[2] + flowFileContents[4],
+ flowFileContents[1] + flowFileContents[3] + flowFileContents[5]
+ };
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinEntries, "3");
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MinEntries, "3");
+ context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
// Generate 6 flowfiles, even files are merged to one, odd files are merged to an other
for (const int i : {0, 1, 2, 3, 4, 5}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
if (i % 2 == 0)
flow->setAttribute("tag", "even");
else
@@ -707,45 +569,26 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]"
{
FixedBuffer callback(flow1->getSize());
sessionGenFlowFile.read(flow1, &callback);
- std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[0]);
}
{
FixedBuffer callback(flow2->getSize());
sessionGenFlowFile.read(flow2, &callback);
- std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
- std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
- REQUIRE(callback.to_string() == contents);
+ REQUIRE(callback.to_string() == expected[1]);
}
}
TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 3; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- expectfileFirst << std::to_string(i);
- }
- }
- }
-
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
core::ProcessSession sessionGenFlowFile(context);
// Generate 3 flowfiles merging all into one
for (const int i : {1, 2, 0}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
@@ -785,31 +628,16 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only C
}
TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
- {
- std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-
- // Create and write to the test file
- for (int i = 0; i < 3; i++) {
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
- for (int j = 0; j < 32; j++) {
- tmpfile << std::to_string(i);
- expectfileFirst << std::to_string(i);
- }
- }
- }
-
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
- context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy, org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::MergeContent::AttributeStrategy, processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
core::ProcessSession sessionGenFlowFile(context);
// Generate 3 flowfiles merging all into one
for (const int i : {1, 2, 0}) {
- const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
- std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
- sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
@@ -942,3 +770,51 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
LogTestController::getInstance().reset();
}
+
+TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]") {
+ std::string expected[2]{
+ flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+ flowFileContents[3] + flowFileContents[4]
+ };
+
+ context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+ context->setProperty(processors::BinFiles::BatchSize, "3");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ // enqueue 5 (five) flowFiles
+ for (const int i : {0, 1, 2, 3, 4}) {
+ const auto flow = sessionGenFlowFile.create();
+ sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
+ }
+
+ REQUIRE(processor->getName() == "mergecontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ // two trigger is enough to process all five flowFiles
+ for (int i = 0; i < 2; i++) {
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+ REQUIRE(expiredFlowRecords.size() == 0);
+ REQUIRE(flow1);
+ {
+ FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
+ sessionGenFlowFile.read(flow1, &callback);
+ REQUIRE(callback.to_string() == expected[0]);
+ }
+ REQUIRE(flow2);
+ {
+ FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
+ sessionGenFlowFile.read(flow2, &callback);
+ REQUIRE(callback.to_string() == expected[1]);
+ }
+}
diff --git a/libminifi/test/unit/EnumTests.cpp b/libminifi/test/unit/EnumTests.cpp
new file mode 100644
index 0000000..574c768
--- /dev/null
+++ b/libminifi/test/unit/EnumTests.cpp
@@ -0,0 +1,105 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+#include <string>
+#include <type_traits>
+#include "utils/Enum.h"
+#include "catch.hpp"
+
+// we need this instead of void_t in GeneralUtils because of GCC4.8
+template<typename ...>
+struct make_void {
+ using type = void;
+};
+
+#define CHECK_COMPILE(name, clazz, expr, result) \
+ template<typename, typename = void> \
+ struct does_compile ## name : std::false_type {};\
+ template<typename T> \
+ struct does_compile ## name<T, typename make_void<decltype(std::declval<T>().template expr)>::type> : std::true_type {}; \
+ static_assert(does_compile ## name<clazz>::value == result, "");
+
+SMART_ENUM(A,
+ (_0, "zero"),
+ (_1, "one")
+)
+
+SMART_ENUM_EXTEND(B, A, (_0, _1),
+ (_2, "two")
+)
+
+SMART_ENUM_EXTEND(C, B, (_0, _1, _2),
+ (_3, "three")
+)
+
+SMART_ENUM(Unrelated,
+ (a, "a"),
+ (b, "b")
+)
+
+// static tests
+namespace test {
+
+CHECK_COMPILE(_1, B, cast<A>(), true)
+CHECK_COMPILE(_2, C, cast<A>(), true)
+CHECK_COMPILE(_3, C, cast<B>(), true)
+
+// casting to unrelated fails
+CHECK_COMPILE(_7, B, cast<Unrelated>(), false)
+CHECK_COMPILE(_8, B, cast<Unrelated>(), false)
+
+} // namespace test
+
+TEST_CASE("Enum checks") {
+ REQUIRE(!A{});
+ REQUIRE(!B{});
+ REQUIRE(A{A::Type(0)});
+ REQUIRE(B{B::Type(1)});
+
+ REQUIRE(A::values() == (std::set<std::string>{"zero", "one"}));
+ REQUIRE(B::values() == (std::set<std::string>{"zero", "one", "two"}));
+ REQUIRE(C::values() == (std::set<std::string>{"zero", "one", "two", "three"}));
+
+ REQUIRE_THROWS(A::parse("not_any"));
+ REQUIRE(!A::parse("not_any", false));
+ REQUIRE(A::parse("zero") == A::_0);
+ REQUIRE(B::parse("zero") == B::_0);
+ REQUIRE(C::parse("one") == C::_1);
+ REQUIRE(C::parse("three") == C::_3);
+ REQUIRE_THROWS(C::parse("nada"));
+ REQUIRE(!C::parse("nada", false));
+
+ REQUIRE(A{A::_0}.toString() == std::string{"zero"});
+ REQUIRE(toString(A::_0) == std::string{"zero"});
+ REQUIRE(B{B::_0}.toString() == std::string{"zero"});
+ REQUIRE(toString(B::_0) == std::string{"zero"});
+ REQUIRE(toString(B::_2) == std::string{"two"});
+ REQUIRE(toString(C::_1) == std::string{"one"});
+ REQUIRE(toString(C::_3) == std::string{"three"});
+ REQUIRE_THROWS(toString(A::Type(55)));
+ REQUIRE_THROWS(toString(C::Type(-1)));
+ REQUIRE_THROWS(C{}.toString());
+
+ REQUIRE(B{B::_0}.cast<A>() == A{A::_0});
+ REQUIRE(C{C::_1}.cast<A>() == A::_1);
+ REQUIRE(C{C::_2}.cast<B>() == B::_2);
+ REQUIRE(!C{C::_3}.cast<B>());
+ REQUIRE(!C{C::_3}.cast<A>());
+ REQUIRE(!B{B::_2}.cast<A>());
+}