You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/11/10 13:25:36 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1380 - Batch behavior for MergeContent and CompressContent

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ed523d4  MINIFICPP-1380 - Batch behavior for MergeContent and CompressContent
ed523d4 is described below

commit ed523d47b7f5db83bd5124466e122459dd340fb4
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Sep 29 14:39:01 2020 +0200

    MINIFICPP-1380 - Batch behavior for MergeContent and CompressContent
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #917
---
 extensions/civetweb/processors/ListenHTTP.cpp      |   2 +-
 extensions/libarchive/BinFiles.cpp                 |  94 +++--
 extensions/libarchive/BinFiles.h                   |  65 ++-
 extensions/libarchive/CompressContent.cpp          | 126 +++---
 extensions/libarchive/CompressContent.h            |  68 ++--
 extensions/libarchive/MergeContent.cpp             |   1 +
 .../processors/ListenSyslog.cpp                    |   2 +-
 .../processors/RetryFlowFile.cpp                   |   2 +-
 libminifi/include/core/CachedValueValidator.h      |   2 +-
 libminifi/include/core/ProcessSession.h            |   2 +
 libminifi/include/core/Property.h                  |  10 +-
 libminifi/include/core/PropertyValidation.h        |  37 +-
 libminifi/include/io/BufferStream.h                |   5 +
 libminifi/include/utils/Enum.h                     | 200 ++++++++++
 libminifi/include/utils/StringUtils.h              |  17 +
 libminifi/src/core/ProcessSession.cpp              |   3 +
 libminifi/src/core/PropertyValidation.cpp          |   6 +-
 libminifi/test/Utils.h                             |   7 +-
 .../test/archive-tests/CompressContentTests.cpp    | 188 ++++++---
 libminifi/test/archive-tests/MergeFileTests.cpp    | 434 ++++++++-------------
 libminifi/test/unit/EnumTests.cpp                  | 105 +++++
 21 files changed, 842 insertions(+), 534 deletions(-)

diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 39b7e8e..fa78527 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -38,7 +38,7 @@ core::Property ListenHTTP::Port(
     core::PropertyBuilder::createProperty("Listening Port")
         ->withDescription("The Port to listen on for incoming connections. 0 means port is going to be selected randomly.")
         ->isRequired(true)
-        ->withDefaultValue<int>(80, core::StandardValidators::LISTEN_PORT_VALIDATOR())->build());
+        ->withDefaultValue<int>(80, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
 core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
                                                " connections. If the Pattern does not match the DN, the connection will be refused.",
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index a6b349e..39aef68 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -38,12 +38,34 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0");
-core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", "");
-core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1");
-core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", "");
-core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>", "");
-core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
+core::Property BinFiles::MinSize(
+    core::PropertyBuilder::createProperty("Minimum Group Size")
+    ->withDescription("The minimum size of for the bundle")
+    ->withDefaultValue<uint64_t>(0)->build());
+core::Property BinFiles::MaxSize(
+    core::PropertyBuilder::createProperty("Maximum Group Size")
+    ->withDescription("The maximum size for the bundle. If not specified, there is no maximum.")
+    ->withType(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR)->build());
+core::Property BinFiles::MinEntries(
+    core::PropertyBuilder::createProperty("Minimum Number of Entries")
+    ->withDescription("The minimum number of files to include in a bundle")
+    ->withDefaultValue<uint32_t>(1)->build());
+core::Property BinFiles::MaxEntries(
+    core::PropertyBuilder::createProperty("Maximum Number of Entries")
+    ->withDescription("The maximum number of files to include in a bundle. If not specified, there is no maximum.")
+    ->withType(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR)->build());
+core::Property BinFiles::MaxBinAge(
+    core::PropertyBuilder::createProperty("Max Bin Age")
+    ->withDescription("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>")
+    ->withType(core::StandardValidators::get().TIME_PERIOD_VALIDATOR)->build());
+core::Property BinFiles::MaxBinCount(
+    core::PropertyBuilder::createProperty("Maximum number of Bins")
+    ->withDescription("Specifies the maximum number of bins that can be held in memory at any one time")
+    ->withDefaultValue<uint32_t>(100)->build());
+core::Property BinFiles::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+    ->withDescription("Maximum number of FlowFiles processed in a single session")
+    ->withDefaultValue<uint32_t>(1)->build());
 core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
 core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to create the bundle will be transferred to failure");
 core::Relationship BinFiles::Self("__self__", "Marks the FlowFile to be owned by this processor");
@@ -65,6 +87,7 @@ void BinFiles::initialize() {
   properties.insert(MaxEntries);
   properties.insert(MaxBinAge);
   properties.insert(MaxBinCount);
+  properties.insert(BatchSize);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -74,41 +97,38 @@ void BinFiles::initialize() {
 }
 
 void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  std::string value;
-  int64_t valInt64;
-  int valInt;
-  if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt64)) {
-    this->binManager_.setMinSize(valInt64);
-    logger_->log_debug("BinFiles: MinSize [%" PRId64 "]", valInt64);
+  uint32_t val32;
+  uint64_t val64;
+  if (context->getProperty(MinSize.getName(), val64)) {
+    this->binManager_.setMinSize({val64});
+    logger_->log_debug("BinFiles: MinSize [%" PRId64 "]", val64);
   }
-  value = "";
-  if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt64)) {
-    this->binManager_.setMaxSize(valInt64);
-    logger_->log_debug("BinFiles: MaxSize [%" PRId64 "]", valInt64);
+  if (context->getProperty(MaxSize.getName(), val64)) {
+    this->binManager_.setMaxSize({val64});
+    logger_->log_debug("BinFiles: MaxSize [%" PRId64 "]", val64);
   }
-  value = "";
-  if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    this->binManager_.setMinEntries(valInt);
-    logger_->log_debug("BinFiles: MinEntries [%d]", valInt);
+  if (context->getProperty(MinEntries.getName(), val32)) {
+    this->binManager_.setMinEntries({val32});
+    logger_->log_debug("BinFiles: MinEntries [%" PRIu32 "]", val32);
   }
-  value = "";
-  if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    this->binManager_.setMaxEntries(valInt);
-    logger_->log_debug("BinFiles: MaxEntries [%d]", valInt);
+  if (context->getProperty(MaxEntries.getName(), val32)) {
+    this->binManager_.setMaxEntries({val32});
+    logger_->log_debug("BinFiles: MaxEntries [%" PRIu32 "]", val32);
   }
-  value = "";
-  if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    maxBinCount_ = valInt;
-    logger_->log_debug("BinFiles: MaxBinCount [%d]", valInt);
+  if (context->getProperty(MaxBinCount.getName(), maxBinCount_)) {
+    logger_->log_debug("BinFiles: MaxBinCount [%" PRIu32 "]", maxBinCount_);
   }
-  value = "";
-  if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
+  std::string maxBinAgeStr;
+  if (context->getProperty(MaxBinAge.getName(), maxBinAgeStr)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, valInt64, unit) && core::Property::ConvertTimeUnitToMS(valInt64, unit, valInt64)) {
-      this->binManager_.setBinAge(valInt64);
-      logger_->log_debug("BinFiles: MaxBinAge [%" PRId64 "]", valInt64);
+    if (core::Property::StringToTime(maxBinAgeStr, val64, unit) && core::Property::ConvertTimeUnitToMS(val64, unit, val64)) {
+      this->binManager_.setBinAge({val64});
+      logger_->log_debug("BinFiles: MaxBinAge [%" PRIu64 "]", val64);
     }
   }
+  if (context->getProperty(BatchSize.getName(), batchSize_)) {
+    logger_->log_debug("BinFiles: BatchSize [%" PRIu32 "]", batchSize_);
+  }
 }
 
 void BinFiles::preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow) {
@@ -259,9 +279,13 @@ void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
     }
   }
 
-  auto flow = session->get();
+  for (size_t i = 0; i < batchSize_; ++i) {
+    auto flow = session->get();
+
+    if (flow == nullptr) {
+      break;
+    }
 
-  if (flow != nullptr) {
     preprocessFlowFile(context.get(), session.get(), flow);
     std::string groupId = getGroupId(context.get(), flow);
 
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index b7f7e6f..a3e594d 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -21,7 +21,7 @@
 #define __BIN_FILES_H__
 
 #include <cinttypes>
-#include <climits>
+#include <limits>
 #include <deque>
 #include <map>
 #include "FlowFileRecord.h"
@@ -146,36 +146,23 @@ class Bin {
 // BinManager Class
 class BinManager {
  public:
-  // Constructor
-  /*!
-   * Create a new BinManager
-   */
-  BinManager()
-      : minSize_(0),
-        maxSize_(ULLONG_MAX),
-        maxEntries_(INT_MAX),
-        minEntries_(1),
-        binAge_(ULLONG_MAX),
-        binCount_(0),
-        logger_(logging::LoggerFactory<BinManager>::getLogger()) {
-  }
   virtual ~BinManager() {
     purge();
   }
-  void setMinSize(const uint64_t &size) {
-    minSize_ = size;
+  void setMinSize(uint64_t size) {
+    minSize_ = {size};
   }
-  void setMaxSize(const uint64_t &size) {
-    maxSize_ = size;
+  void setMaxSize(uint64_t size) {
+    maxSize_ = {size};
   }
-  void setMaxEntries(const int &entries) {
-    maxEntries_ = entries;
+  void setMaxEntries(uint32_t entries) {
+    maxEntries_ = {entries};
   }
-  void setMinEntries(const int &entries) {
-    minEntries_ = entries;
+  void setMinEntries(uint32_t entries) {
+    minEntries_ = {entries};
   }
-  void setBinAge(const uint64_t &age) {
-    binAge_ = age;
+  void setBinAge(uint64_t age) {
+    binAge_ = {age};
   }
   int getBinCount() {
     return binCount_;
@@ -201,17 +188,17 @@ class BinManager {
 
  private:
   std::mutex mutex_;
-  uint64_t minSize_;
-  uint64_t maxSize_;
-  int maxEntries_;
-  int minEntries_;
+  uint64_t minSize_{0};
+  uint64_t maxSize_{std::numeric_limits<decltype(maxSize_)>::max()};
+  uint32_t maxEntries_{std::numeric_limits<decltype(maxEntries_)>::max()};
+  uint32_t minEntries_{1};
   std::string fileCount_;
   // Bin Age in msec
-  uint64_t binAge_;
+  uint64_t binAge_{std::numeric_limits<decltype(binAge_)>::max()};
   std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >groupBinMap_;
   std::deque<std::unique_ptr<Bin>> readyBin_;
-  int binCount_;
-  std::shared_ptr<logging::Logger> logger_;
+  int binCount_{0};
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BinManager>::getLogger()};
 };
 
 // BinFiles Class
@@ -219,15 +206,7 @@ class BinFiles : public core::Processor {
  protected:
   static core::Relationship Self;
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit BinFiles(std::string name, utils::Identifier uuid = utils::Identifier())
-      : core::Processor(name, uuid),
-        logger_(logging::LoggerFactory<BinFiles>::getLogger()) {
-    maxBinCount_ = 100;
-  }
+  using core::Processor::Processor;
   // Destructor
   virtual ~BinFiles() = default;
   // Processor Name
@@ -239,6 +218,7 @@ class BinFiles : public core::Processor {
   static core::Property MaxEntries;
   static core::Property MaxBinCount;
   static core::Property MaxBinAge;
+  static core::Property BatchSize;
 
   // Supported Relationships
   static core::Relationship Failure;
@@ -308,8 +288,9 @@ class BinFiles : public core::Processor {
     std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
   };
 
-  std::shared_ptr<logging::Logger> logger_;
-  int maxBinCount_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BinFiles>::getLogger()};
+  uint32_t batchSize_{1};
+  uint32_t maxBinCount_{100};
   FlowFileStore file_store_;
 };
 
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 4aa86fd..c0cb715 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -40,16 +40,13 @@ core::Property CompressContent::CompressLevel(
         ->isRequired(false)->withDefaultValue<int>(1)->build());
 core::Property CompressContent::CompressMode(
     core::PropertyBuilder::createProperty("Mode")->withDescription("Indicates whether the processor should compress content or decompress content.")
-        ->isRequired(false)->withAllowableValues<std::string>({MODE_COMPRESS, MODE_DECOMPRESS})->withDefaultValue(MODE_COMPRESS)->build());
+        ->isRequired(false)->withAllowableValues(CompressionMode::values())
+        ->withDefaultValue(toString(CompressionMode::Compress))->build());
 core::Property CompressContent::CompressFormat(
     core::PropertyBuilder::createProperty("Compression Format")->withDescription("The compression format to use.")
         ->isRequired(false)
-        ->withAllowableValues<std::string>({
-          COMPRESSION_FORMAT_ATTRIBUTE,
-          COMPRESSION_FORMAT_GZIP,
-          COMPRESSION_FORMAT_BZIP2,
-          COMPRESSION_FORMAT_XZ_LZMA2,
-          COMPRESSION_FORMAT_LZMA})->withDefaultValue(COMPRESSION_FORMAT_ATTRIBUTE)->build());
+        ->withAllowableValues(ExtendedCompressionFormat::values())
+        ->withDefaultValue(toString(ExtendedCompressionFormat::USE_MIME_TYPE))->build());
 core::Property CompressContent::UpdateFileName(
     core::PropertyBuilder::createProperty("Update Filename")->withDescription("Determines if filename extension need to be updated")
         ->isRequired(false)->withDefaultValue<bool>(false)->build());
@@ -60,10 +57,29 @@ core::Property CompressContent::EncapsulateInTar(
                           "If false, on compression the content of the FlowFile simply gets compressed, and on decompression a simple compressed content is expected.\n"
                           "true is the behaviour compatible with older MiNiFi C++ versions, false is the behaviour compatible with NiFi.")
         ->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property CompressContent::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+    ->withDescription("Maximum number of FlowFiles processed in a single session")
+    ->withDefaultValue<uint32_t>(1)->build());
 
 core::Relationship CompressContent::Success("success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed");
 core::Relationship CompressContent::Failure("failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress");
 
+const std::map<std::string, CompressContent::CompressionFormat> CompressContent::compressionFormatMimeTypeMap_{
+  {"application/gzip", CompressionFormat::GZIP},
+  {"application/bzip2", CompressionFormat::BZIP2},
+  {"application/x-bzip2", CompressionFormat::BZIP2},
+  {"application/x-lzma", CompressionFormat::LZMA},
+  {"application/x-xz", CompressionFormat::XZ_LZMA2}
+};
+
+const std::map<CompressContent::CompressionFormat, std::string> CompressContent::fileExtension_{
+  {CompressionFormat::GZIP, ".gz"},
+  {CompressionFormat::LZMA, ".lzma"},
+  {CompressionFormat::BZIP2, ".bz2"},
+  {CompressionFormat::XZ_LZMA2, ".xz"}
+};
+
 void CompressContent::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
@@ -72,6 +88,7 @@ void CompressContent::initialize() {
   properties.insert(CompressFormat);
   properties.insert(UpdateFileName);
   properties.insert(EncapsulateInTar);
+  properties.insert(BatchSize);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -81,39 +98,38 @@ void CompressContent::initialize() {
 }
 
 void CompressContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  std::string value;
   context->getProperty(CompressLevel.getName(), compressLevel_);
   context->getProperty(CompressMode.getName(), compressMode_);
   context->getProperty(CompressFormat.getName(), compressFormat_);
   context->getProperty(UpdateFileName.getName(), updateFileName_);
   context->getProperty(EncapsulateInTar.getName(), encapsulateInTar_);
+  context->getProperty(BatchSize.getName(), batchSize_);
 
   logger_->log_info("Compress Content: Mode [%s] Format [%s] Level [%d] UpdateFileName [%d] EncapsulateInTar [%d]",
-      compressMode_, compressFormat_, compressLevel_, updateFileName_, encapsulateInTar_);
-
-  // update the mimeTypeMap
-  compressionFormatMimeTypeMap_["application/gzip"] = COMPRESSION_FORMAT_GZIP;
-  compressionFormatMimeTypeMap_["application/bzip2"] = COMPRESSION_FORMAT_BZIP2;
-  compressionFormatMimeTypeMap_["application/x-bzip2"] = COMPRESSION_FORMAT_BZIP2;
-  compressionFormatMimeTypeMap_["application/x-lzma"] = COMPRESSION_FORMAT_LZMA;
-  compressionFormatMimeTypeMap_["application/x-xz"] = COMPRESSION_FORMAT_XZ_LZMA2;
-  fileExtension_[COMPRESSION_FORMAT_GZIP] = ".gz";
-  fileExtension_[COMPRESSION_FORMAT_LZMA] = ".lzma";
-  fileExtension_[COMPRESSION_FORMAT_BZIP2] = ".bz2";
-  fileExtension_[COMPRESSION_FORMAT_XZ_LZMA2] = ".xz";
+      compressMode_.toString(), compressFormat_.toString(), compressLevel_, updateFileName_, encapsulateInTar_);
 }
 
 void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  std::shared_ptr<core::FlowFile> flowFile = session->get();
-
-  if (!flowFile) {
+  size_t processedFlowFileCount = 0;
+  for (; processedFlowFileCount < batchSize_; ++processedFlowFileCount) {
+    std::shared_ptr<core::FlowFile> flowFile = session->get();
+    if (!flowFile) {
+      break;
+    }
+    processFlowFile(flowFile, session);
+  }
+  if (processedFlowFileCount == 0) {
+    // we got no flowFiles
+    context->yield();
     return;
   }
+}
 
+void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session) {
   session->remove(flowFile);
 
-  std::string compressFormat = compressFormat_;
-  if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
+  CompressionFormat compressFormat;
+  if (compressFormat_ == ExtendedCompressionFormat::USE_MIME_TYPE) {
     std::string attr;
     flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
     if (attr.empty()) {
@@ -129,36 +145,24 @@ void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &con
       session->transfer(flowFile, Success);
       return;
     }
-  }
-  std::transform(compressFormat.begin(), compressFormat.end(), compressFormat.begin(), ::tolower);
-  std::string mimeType;
-  if (compressFormat == COMPRESSION_FORMAT_GZIP) {
-    mimeType = "application/gzip";
-  } else if (compressFormat == COMPRESSION_FORMAT_BZIP2) {
-    mimeType = "application/bzip2";
-  } else if (compressFormat == COMPRESSION_FORMAT_LZMA) {
-    mimeType = "application/x-lzma";
-  } else if (compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) {
-    mimeType = "application/x-xz";
   } else {
-    logger_->log_error("Compress format is invalid %s", compressFormat);
-    session->transfer(flowFile, Failure);
-    return;
+    compressFormat = compressFormat_.cast<CompressionFormat>();
   }
+  std::string mimeType = toMimeType(compressFormat);
 
   // Validate
-  if (!encapsulateInTar_ && compressFormat != COMPRESSION_FORMAT_GZIP) {
+  if (!encapsulateInTar_ && compressFormat != CompressionFormat::GZIP) {
     logger_->log_error("non-TAR encapsulated format only supports GZIP compression");
     session->transfer(flowFile, Failure);
     return;
   }
-  if (compressFormat == COMPRESSION_FORMAT_BZIP2 && archive_bzlib_version() == nullptr) {
-    logger_->log_error("%s compression format is requested, but the agent was compiled without BZip2 support", compressFormat);
+  if (compressFormat == CompressionFormat::BZIP2 && archive_bzlib_version() == nullptr) {
+    logger_->log_error("%s compression format is requested, but the agent was compiled without BZip2 support", compressFormat.toString());
     session->transfer(flowFile, Failure);
     return;
   }
-  if ((compressFormat == COMPRESSION_FORMAT_LZMA || compressFormat == COMPRESSION_FORMAT_XZ_LZMA2) && archive_liblzma_version() == nullptr) {
-    logger_->log_error("%s compression format is requested, but the agent was compiled without LZMA support ", compressFormat);
+  if ((compressFormat == CompressionFormat::LZMA || compressFormat == CompressionFormat::XZ_LZMA2) && archive_liblzma_version() == nullptr) {
+    logger_->log_error("%s compression format is requested, but the agent was compiled without LZMA support ", compressFormat.toString());
     session->transfer(flowFile, Failure);
     return;
   }
@@ -168,43 +172,53 @@ void CompressContent::onTrigger(const std::shared_ptr<core::ProcessContext> &con
   if (search != fileExtension_.end()) {
     fileExtension = search->second;
   }
-  std::shared_ptr<core::FlowFile> processFlowFile = session->create(flowFile);
+  std::shared_ptr<core::FlowFile> result = session->create(flowFile);
   bool success = false;
   if (encapsulateInTar_) {
     CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
-    session->write(processFlowFile, &callback);
+    session->write(result, &callback);
     success = callback.status_ >= 0;
   } else {
     CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
-    session->write(processFlowFile, &callback);
+    session->write(result, &callback);
     success = callback.success_;
   }
 
   if (!success) {
     logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr());
     session->transfer(flowFile, Failure);
-    session->remove(processFlowFile);
+    session->remove(result);
   } else {
     std::string fileName;
-    processFlowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
-    if (compressMode_ == MODE_COMPRESS) {
-      session->putAttribute(processFlowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
+    result->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
+    if (compressMode_ == CompressionMode::Compress) {
+      session->putAttribute(result, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
       if (updateFileName_) {
         fileName = fileName + fileExtension;
-        session->putAttribute(processFlowFile, core::SpecialFlowAttribute::FILENAME, fileName);
+        session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
       }
     } else {
-      session->removeAttribute(processFlowFile, core::SpecialFlowAttribute::MIME_TYPE);
+      session->removeAttribute(result, core::SpecialFlowAttribute::MIME_TYPE);
       if (updateFileName_) {
         if (fileName.size() >= fileExtension.size() && fileName.compare(fileName.size() - fileExtension.size(), fileExtension.size(), fileExtension) == 0) {
           fileName = fileName.substr(0, fileName.size() - fileExtension.size());
-          session->putAttribute(processFlowFile, core::SpecialFlowAttribute::FILENAME, fileName);
+          session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
         }
       }
     }
-    logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
-    session->transfer(processFlowFile, Success);
+    logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", result->getUUIDStr(), fileName);
+    session->transfer(result, Success);
+  }
+}
+
+std::string CompressContent::toMimeType(CompressionFormat format) {
+  switch (format.value()) {
+    case CompressionFormat::GZIP: return "application/gzip";
+    case CompressionFormat::BZIP2: return "application/bzip2";
+    case CompressionFormat::LZMA: return "application/x-lzma";
+    case CompressionFormat::XZ_LZMA2: return "application/x-xz";
   }
+  throw Exception(GENERAL_EXCEPTION, "Invalid compression format");
 }
 
 } /* namespace processors */
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 8cda1d6..3f87685 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -33,6 +33,7 @@
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "io/ZlibStream.h"
+#include "utils/Enum.h"
 
 namespace org {
 namespace apache {
@@ -40,15 +41,6 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_FORMAT_ATTRIBUTE "use mime.type attribute"
-#define COMPRESSION_FORMAT_GZIP "gzip"
-#define COMPRESSION_FORMAT_BZIP2 "bzip2"
-#define COMPRESSION_FORMAT_XZ_LZMA2 "xz-lzma2"
-#define COMPRESSION_FORMAT_LZMA "lzma"
-
-#define MODE_COMPRESS "compress"
-#define MODE_DECOMPRESS "decompress"
-
 // CompressContent Class
 class CompressContent: public core::Processor {
 public:
@@ -72,11 +64,28 @@ public:
   static core::Property CompressFormat;
   static core::Property UpdateFileName;
   static core::Property EncapsulateInTar;
+  static core::Property BatchSize;
 
   // Supported Relationships
   static core::Relationship Failure;
   static core::Relationship Success;
 
+  SMART_ENUM(CompressionMode,
+    (Compress, "compress"),
+    (Decompress, "decompress")
+  )
+
+  SMART_ENUM(CompressionFormat,
+    (GZIP, "gzip"),
+    (LZMA, "lzma"),
+    (XZ_LZMA2, "xz-lzma2"),
+    (BZIP2, "bzip2")
+  )
+
+  SMART_ENUM_EXTEND(ExtendedCompressionFormat, CompressionFormat, (GZIP, LZMA, XZ_LZMA2, BZIP2),
+    (USE_MIME_TYPE, "use mime.type attribute")
+  )
+
 public:
   // Nest Callback Class for read stream from flow for compress
   class ReadCallbackCompress: public InputStreamCallback {
@@ -125,7 +134,7 @@ public:
   // Nest Callback Class for read stream from flow for decompress
   class ReadCallbackDecompress: public InputStreamCallback {
   public:
-    ReadCallbackDecompress(std::shared_ptr<core::FlowFile> &flow) :
+    ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
         read_size_(0), offset_(0), flow_(flow) {
       origin_offset_ = flow_->getOffset();
     }
@@ -149,8 +158,8 @@ public:
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
   public:
-    WriteCallback(std::string &compress_mode, int compress_level, std::string &compress_format,
-        std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
+    WriteCallback(CompressionMode compress_mode, int compress_level, CompressionFormat compress_format,
+        const std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) :
         compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format),
         flow_(flow), session_(session),
         logger_(logging::LoggerFactory<CompressContent>::getLogger()),
@@ -161,9 +170,9 @@ public:
     }
     ~WriteCallback() = default;
 
-    std::string compress_mode_;
+    CompressionMode compress_mode_;
     int compress_level_;
-    std::string compress_format_;
+    CompressionFormat compress_format_;
     std::shared_ptr<core::FlowFile> flow_;
     std::shared_ptr<core::ProcessSession> session_;
     std::shared_ptr<io::BaseStream> stream_;
@@ -212,7 +221,7 @@ public:
       struct archive *arch;
       int r;
 
-      if (compress_mode_ == MODE_COMPRESS) {
+      if (compress_mode_ == CompressionMode::Compress) {
         arch = archive_write_new();
         if (!arch) {
           status_ = -1;
@@ -223,7 +232,7 @@ public:
           archive_write_log_error_cleanup(arch);
           return -1;
         }
-        if (compress_format_ == COMPRESSION_FORMAT_GZIP) {
+        if (compress_format_ == CompressionFormat::GZIP) {
           r = archive_write_add_filter_gzip(arch);
           if (r != ARCHIVE_OK) {
             archive_write_log_error_cleanup(arch);
@@ -236,19 +245,19 @@ public:
             archive_write_log_error_cleanup(arch);
             return -1;
           }
-        } else if (compress_format_ == COMPRESSION_FORMAT_BZIP2) {
+        } else if (compress_format_ == CompressionFormat::BZIP2) {
           r = archive_write_add_filter_bzip2(arch);
           if (r != ARCHIVE_OK) {
             archive_write_log_error_cleanup(arch);
             return -1;
           }
-        } else if (compress_format_ == COMPRESSION_FORMAT_LZMA) {
+        } else if (compress_format_ == CompressionFormat::LZMA) {
           r = archive_write_add_filter_lzma(arch);
           if (r != ARCHIVE_OK) {
             archive_write_log_error_cleanup(arch);
             return -1;
           }
-        } else if (compress_format_ == COMPRESSION_FORMAT_XZ_LZMA2) {
+        } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
           r = archive_write_add_filter_xz(arch);
           if (r != ARCHIVE_OK) {
             archive_write_log_error_cleanup(arch);
@@ -346,7 +355,7 @@ public:
 
   class GzipWriteCallback : public OutputStreamCallback {
    public:
-    GzipWriteCallback(std::string compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
+    GzipWriteCallback(CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
       : logger_(logging::LoggerFactory<CompressContent>::getLogger())
       , compress_mode_(std::move(compress_mode))
       , compress_level_(compress_level)
@@ -355,7 +364,7 @@ public:
     }
 
     std::shared_ptr<logging::Logger> logger_;
-    std::string compress_mode_;
+    CompressionMode compress_mode_;
     int compress_level_;
     std::shared_ptr<core::FlowFile> flow_;
     std::shared_ptr<core::ProcessSession> session_;
@@ -394,7 +403,7 @@ public:
       };
 
       std::shared_ptr<io::ZlibBaseStream> filterStream;
-      if (compress_mode_ == MODE_COMPRESS) {
+      if (compress_mode_ == CompressionMode::Compress) {
         filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
       } else {
         filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP);
@@ -424,17 +433,20 @@ public:
   // Initialize, over write by NiFi CompressContent
   virtual void initialize(void);
 
-protected:
-
 private:
+  static std::string toMimeType(CompressionFormat format);
+
+  void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
+
   std::shared_ptr<logging::Logger> logger_;
   int compressLevel_;
-  std::string compressMode_;
-  std::string compressFormat_;
+  CompressionMode compressMode_;
+  ExtendedCompressionFormat compressFormat_;
   bool updateFileName_;
   bool encapsulateInTar_;
-  std::map<std::string, std::string> compressionFormatMimeTypeMap_;
-  std::map<std::string, std::string> fileExtension_;
+  uint32_t batchSize_{1};
+  static const std::map<std::string, CompressionFormat> compressionFormatMimeTypeMap_;
+  static const std::map<CompressionFormat, std::string> fileExtension_;
 };
 
 REGISTER_RESOURCE(CompressContent, "Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate");
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index e702eb5..ecf0407 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -89,6 +89,7 @@ void MergeContent::initialize() {
   properties.insert(MaxEntries);
   properties.insert(MaxBinAge);
   properties.insert(MaxBinCount);
+  properties.insert(BatchSize);
   properties.insert(MergeStrategy);
   properties.insert(MergeFormat);
   properties.insert(CorrelationAttributeName);
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp b/extensions/standard-processors/processors/ListenSyslog.cpp
index 692c39f..d271d5b 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -64,7 +64,7 @@ core::Property ListenSyslog::Protocol(
         "UDP")->build());
 
 core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::PORT_VALIDATOR())->build());
+    core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build());
 
 core::Relationship ListenSyslog::Success("success", "All files are routed to success");
 core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
diff --git a/extensions/standard-processors/processors/RetryFlowFile.cpp b/extensions/standard-processors/processors/RetryFlowFile.cpp
index de235e8..cd88e45 100644
--- a/extensions/standard-processors/processors/RetryFlowFile.cpp
+++ b/extensions/standard-processors/processors/RetryFlowFile.cpp
@@ -31,7 +31,7 @@ core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProper
         "The name of the attribute that contains the current retry count for the FlowFile."
         "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
         "the processor will either overwrite that attribute with '1' or fail based on configuration.")
-    ->withDefaultValue("flowfile.retries", core::StandardValidators::NON_BLANK_VALIDATOR())
+    ->withDefaultValue("flowfile.retries", core::StandardValidators::get().NON_BLANK_VALIDATOR)
     ->supportsExpressionLanguage(true)
     ->isRequired(true)
     ->build());
diff --git a/libminifi/include/core/CachedValueValidator.h b/libminifi/include/core/CachedValueValidator.h
index 1076027..9102248 100644
--- a/libminifi/include/core/CachedValueValidator.h
+++ b/libminifi/include/core/CachedValueValidator.h
@@ -112,7 +112,7 @@ class CachedValueValidator {
     validation_result_ = Result::RECOMPUTE;
   }
 
-  gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::VALID_VALIDATOR()};
+  gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::get().VALID_VALIDATOR};
   mutable Result validation_result_{Result::RECOMPUTE};
 };
 
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index c429570..60ddafc 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -109,6 +109,8 @@ class ProcessSession : public ReferenceContainer {
    * @param flow flow file
    */
   void importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow);
+  void importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow);
+
   // import from the data source.
   void import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0);
   DEPRECATED(/*deprecated in*/ 0.7.0, /*will remove in */ 2.0) void import(std::string source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter); // NOLINT
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index d832da4..44719bd 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -355,7 +355,7 @@ class Property {
       validator_ = StandardValidators::getValidator(ret.getValue());
     } else {
       ret = value;
-      validator_ = StandardValidators::VALID_VALIDATOR();
+      validator_ = StandardValidators::get().VALID_VALIDATOR;
     }
     return ret;
   }
@@ -369,7 +369,7 @@ class Property {
   bool is_collection_;
   PropertyValue default_value_;
   std::vector<PropertyValue> values_;
-  gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::VALID_VALIDATOR()};
+  gsl::not_null<std::shared_ptr<PropertyValidator>> validator_{StandardValidators::get().VALID_VALIDATOR};
   std::string display_name_;
   std::vector<PropertyValue> allowed_values_;
   // types represents the allowable types for this property
@@ -432,6 +432,12 @@ class PropertyBuilder : public std::enable_shared_from_this<PropertyBuilder> {
     return shared_from_this();
   }
 
+  std::shared_ptr<PropertyBuilder> withType(const std::shared_ptr<PropertyValidator> &validator) {
+    prop.validator_ = gsl::make_not_null(validator);
+    prop.default_value_.setValidator(gsl::make_not_null(validator));
+    return shared_from_this();
+  }
+
   template<typename T>
   std::shared_ptr<ConstrainedProperty<T>> withAllowableValue(const T& df) {
     auto property = std::make_shared<ConstrainedProperty<T>>(shared_from_this());
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index 800a3f5..3583749 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -330,9 +330,15 @@ class TimePeriodValidator : public PropertyValidator {
 // STATIC DEFINITIONS
 
 class StandardValidators {
+  StandardValidators();
+
  public:
-  static const gsl::not_null<std::shared_ptr<PropertyValidator>> &getValidator(const std::shared_ptr<minifi::state::response::Value> &input) {
+  static const StandardValidators& get() {
     static StandardValidators init;
+    return init;
+  }
+  static const gsl::not_null<std::shared_ptr<PropertyValidator>> &getValidator(const std::shared_ptr<minifi::state::response::Value> &input) {
+    const StandardValidators& init = get();
     if (std::dynamic_pointer_cast<core::DataSizeValue>(input) != nullptr) {
       return init.DATA_SIZE_VALIDATOR;
     } else if (std::dynamic_pointer_cast<core::TimePeriodValue>(input) != nullptr) {
@@ -348,31 +354,11 @@ class StandardValidators {
     } else if (std::dynamic_pointer_cast<minifi::state::response::UInt64Value>(input) != nullptr) {
       return init.UNSIGNED_LONG_VALIDATOR;
     } else {
-      return org::apache::nifi::minifi::core::StandardValidators::VALID_VALIDATOR();
+      return init.VALID_VALIDATOR;
     }
   }
 
-  static const gsl::not_null<std::shared_ptr<PropertyValidator>>& NON_BLANK_VALIDATOR() {
-    static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<NonBlankValidator>("NON_BLANK_VALIDATOR"));
-    return validator;
-  }
-
-  static const gsl::not_null<std::shared_ptr<PropertyValidator>>& VALID_VALIDATOR() {
-    static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<AlwaysValid>(true, "VALID"));
-    return validator;
-  }
-
-  static gsl::not_null<std::shared_ptr<PropertyValidator>> PORT_VALIDATOR() {
-    static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<PortValidator>("PORT_VALIDATOR"));
-    return validator;
-  }
-
-  static gsl::not_null<std::shared_ptr<PropertyValidator>> LISTEN_PORT_VALIDATOR() {
-    static gsl::not_null<std::shared_ptr<PropertyValidator>> validator(std::make_shared<ListenPortValidator>("PORT_VALIDATOR"));
-    return validator;
-  }
-
- private:
+ public:
   gsl::not_null<std::shared_ptr<PropertyValidator>> INVALID;
   gsl::not_null<std::shared_ptr<PropertyValidator>> INTEGER_VALIDATOR;
   gsl::not_null<std::shared_ptr<PropertyValidator>> UNSIGNED_INT_VALIDATOR;
@@ -382,7 +368,10 @@ class StandardValidators {
   gsl::not_null<std::shared_ptr<PropertyValidator>> DATA_SIZE_VALIDATOR;
   gsl::not_null<std::shared_ptr<PropertyValidator>> TIME_PERIOD_VALIDATOR;
 
-  StandardValidators();
+  gsl::not_null<std::shared_ptr<PropertyValidator>> NON_BLANK_VALIDATOR;
+  gsl::not_null<std::shared_ptr<PropertyValidator>> VALID_VALIDATOR;
+  gsl::not_null<std::shared_ptr<PropertyValidator>> PORT_VALIDATOR;
+  gsl::not_null<std::shared_ptr<PropertyValidator>> LISTEN_PORT_VALIDATOR;
 };
 
 }  // namespace core
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 34b0430..c393abb 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -21,6 +21,7 @@
 #include <iostream>
 #include <cstdint>
 #include <vector>
+#include <string>
 #include "BaseStream.h"
 
 namespace org {
@@ -37,6 +38,10 @@ class BufferStream : public BaseStream {
     write(buf, len);
   }
 
+  explicit BufferStream(const std::string& data) {
+    write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+  }
+
   using BaseStream::read;
   using BaseStream::write;
 
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
new file mode 100644
index 0000000..440b158
--- /dev/null
+++ b/libminifi/include/utils/Enum.h
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <cstring>
+#include <set>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+#define COMMA(...) ,
+#define MSVC_HACK(x) x
+
+#define PICK_(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, ...) _15
+#define COUNT(...) \
+  MSVC_HACK(PICK_(__VA_ARGS__, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0))
+
+#define CONCAT_(a, b) a ## b
+#define CONCAT(a, b) CONCAT_(a, b)
+
+#define CALL(Fn, ...) MSVC_HACK(Fn(__VA_ARGS__))
+#define SPREAD(...) __VA_ARGS__
+
+#define FOR_EACH(fn, delim, ARGS) \
+  CALL(CONCAT(FOR_EACH_, COUNT ARGS), fn, delim, SPREAD ARGS)
+
+#define FOR_EACH_0(...)
+#define FOR_EACH_1(fn, delim, _1) fn(_1)
+#define FOR_EACH_2(fn, delim, _1, _2) fn(_1) delim() fn(_2)
+#define FOR_EACH_3(fn, delim, _1, _2, _3) fn(_1) delim() fn(_2) delim() fn(_3)
+#define FOR_EACH_4(fn, delim, _1, _2, _3, _4) \
+  fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4)
+#define FOR_EACH_5(fn, delim, _1, _2, _3, _4, _5) \
+  fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() fn(_5)
+#define FOR_EACH_6(fn, delim, _1, _2, _3, _4, _5, _6) \
+  fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+  fn(_5) delim() fn(_6)
+#define FOR_EACH_7(fn, delim, _1, _2, _3, _4, _5, _6, _7) \
+  fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+  fn(_5) delim() fn(_6) delim() fn(_7)
+#define FOR_EACH_8(fn, delim, _1, _2, _3, _4, _5, _6, _7, _8) \
+  fn(_1) delim() fn(_2) delim() fn(_3) delim() fn(_4) delim() \
+  fn(_5) delim() fn(_6) delim() fn(_7) delim() fn(_8)
+
+#define FIRST_(a, b) a
+#define FIRST(x, ...) FIRST_ x
+#define SECOND_(a, b) b
+#define SECOND(x, ...) SECOND_ x
+#define NOTHING()
+
+#define INCLUDE_BASE_FIELD(x) \
+  x = Base::x
+
+#define SMART_ENUM_BODY(Clazz, ...) \
+    constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
+    Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+    Clazz(const char* str) : value_{parse(str).value_} {} \
+   private: \
+    Type value_; \
+   public: \
+    Type value() const { \
+      return value_; \
+    } \
+    struct detail : Base::detail { \
+      static std::set<std::string> values() { \
+        static constexpr const char* ownValues[]{ \
+          FOR_EACH(SECOND, COMMA, (__VA_ARGS__)) \
+        }; \
+        std::set<std::string> values = Base::detail::values(); \
+        for (auto value : ownValues) { \
+          values.emplace(value); \
+        } \
+        return values; \
+      } \
+      static const char* toStringImpl(Type a, const char* DerivedName) { \
+        static constexpr const char* values[]{ \
+          FOR_EACH(SECOND, COMMA, (__VA_ARGS__)) \
+        }; \
+        int index = static_cast<int>(a); \
+        if (Base::length <= index && index < length) { \
+          return values[index - Base::length]; \
+        } \
+        return Base::detail::toStringImpl(static_cast<Base::Type>(a), DerivedName); \
+      } \
+    }; \
+    static constexpr int length = Base::length + COUNT(__VA_ARGS__); \
+    friend const char* toString(Type a) { \
+      return detail::toStringImpl(a, #Clazz); \
+    } \
+    const char* toString() const { \
+      return detail::toStringImpl(value_, #Clazz); \
+    } \
+    static std::set<std::string> values() { \
+      return detail::values(); \
+    } \
+    bool operator==(Type val) const { \
+      return value_ == val; \
+    } \
+    bool operator!=(Type val) const { \
+      return value_ != val; \
+    } \
+    bool operator==(const Clazz& other) const { \
+      return value_ == other.value_; \
+    } \
+    bool operator!=(const Clazz& other) const { \
+      return value_ != other.value_; \
+    } \
+    bool operator<(const Clazz& other) const { \
+      return value_ < other.value_;\
+    } \
+    explicit operator bool() const { \
+      int idx = static_cast<int>(value_); \
+      return 0 <= idx && idx < length; \
+    } \
+    static Clazz parse(const char* str, bool throw_on_invalid = true) { \
+      for (int idx = 0; idx < length; ++idx) { \
+        if (std::strcmp(str, detail::toStringImpl(static_cast<Type>(idx), #Clazz)) == 0) \
+          return static_cast<Type>(idx); \
+      } \
+      if (throw_on_invalid) { \
+        throw std::runtime_error(std::string("Cannot convert \"") + str + "\" to " #Clazz); \
+      } \
+      return {}; \
+    } \
+    template<typename T, typename = typename std::enable_if<std::is_base_of<typename T::detail, detail>::value>::type> \
+    T cast() const { \
+      if (0 <= value_ && value_ < T::length) { \
+        return static_cast<typename T::Type>(value_); \
+      } \
+      return {}; \
+    }
+
+/**
+ * These macros provide an encapsulation of enum-like behavior offering the following:
+ *  - switch safety: the compiler can detect if some enum cases are not handled
+ *  - string conversion: convert between enum instances and their string representations (toString, parse)
+ *  - validity: check if it contains a value that is an invalid enum value
+ *  - extensibility: extend an enum with new values and safely* cast from derived to base
+ *                   (* "safely" here means that there is no casting between unrelated enums)
+ *  - reflection: access the set of all string representations
+ */
+
+#define SMART_ENUM(Clazz, ...) \
+  struct Clazz { \
+    using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
+    enum Type { \
+      FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
+    }; \
+    SMART_ENUM_BODY(Clazz, __VA_ARGS__) \
+  };
+
+#define SMART_ENUM_EXTEND(Clazz, base, base_fields, ...) \
+  struct Clazz { \
+    using Base = base; \
+    enum Type { \
+      FOR_EACH(INCLUDE_BASE_FIELD, COMMA, base_fields), \
+      FOR_EACH(FIRST, COMMA, (__VA_ARGS__)) \
+    }; \
+    static_assert((COUNT base_fields) == Base::length, "Must enumerate all base instance values"); \
+    SMART_ENUM_BODY(Clazz, __VA_ARGS__) \
+  };
+
+struct EnumBase {
+  enum Type {};
+  static constexpr int length = 0;
+  struct detail {
+    static std::set<std::string> values() {
+      return {};
+    }
+    static const char* toStringImpl(Type a, const char* DerivedName) {
+      throw std::runtime_error(std::string("Cannot stringify unknown instance in enum \"") + DerivedName + "\" : \""
+                               + std::to_string(static_cast<int>(a)) + "\"");
+    }
+  };
+};
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 1a373b5..2143247 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -331,6 +331,23 @@ class StringUtils {
   static bool from_hex(uint8_t ch, uint8_t& output);
 
   /**
+   * Creates a string that is a concatenation of count instances of the provided string.
+   * @tparam TChar char type of the string (char or wchar_t)
+   * @param str that is to be repeated
+   * @param count the number of times the string is repeated
+   * @return the result string
+   */
+  template<typename TChar>
+  static std::basic_string<TChar> repeat(const TChar* str, size_t count) {
+    return repeat(std::basic_string<TChar>(str), count);
+  }
+
+  template<typename TChar>
+  static std::basic_string<TChar> repeat(const std::basic_string<TChar>& str, size_t count) {
+    return join("", std::vector<std::basic_string<TChar>>(count, str));
+  }
+
+  /**
    * Hexdecodes the hexencoded string in data, ignoring every character that is not [0-9a-fA-F]
    * @param data the output buffer where the hexdecoded bytes will be written. Must be at least length / 2 bytes long.
    * @param data_length pointer to the length of data the data buffer. It will be filled with the length of the decoded bytes.
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index dc002f0..6a8273a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -315,6 +315,9 @@ int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStrea
   }
 }
 
+void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) {
+  importFrom(stream, flow);
+}
 /**
  * Imports a file from the data stream
  * @param stream incoming data stream that contains the data to store into a file
diff --git a/libminifi/src/core/PropertyValidation.cpp b/libminifi/src/core/PropertyValidation.cpp
index 304aa39..1be431c 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -33,7 +33,11 @@ StandardValidators::StandardValidators()
     UNSIGNED_LONG_VALIDATOR(std::make_shared<UnsignedLongValidator>("LONG_VALIDATOR")),
     DATA_SIZE_VALIDATOR(std::make_shared<DataSizeValidator>("DATA_SIZE_VALIDATOR")),
     TIME_PERIOD_VALIDATOR(std::make_shared<TimePeriodValidator>("TIME_PERIOD_VALIDATOR")),
-    BOOLEAN_VALIDATOR(std::make_shared<BooleanValidator>("BOOLEAN_VALIDATOR")) {}
+    BOOLEAN_VALIDATOR(std::make_shared<BooleanValidator>("BOOLEAN_VALIDATOR")),
+    NON_BLANK_VALIDATOR(std::make_shared<NonBlankValidator>("NON_BLANK_VALIDATOR")),
+    VALID_VALIDATOR(std::make_shared<AlwaysValid>(true, "VALID")),
+    PORT_VALIDATOR(std::make_shared<PortValidator>("PORT_VALIDATOR")),
+    LISTEN_PORT_VALIDATOR(std::make_shared<ListenPortValidator>("PORT_VALIDATOR")) {}
 
 } /* namespace core */
 } /* namespace minifi */
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 9796bbd..228a135 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_TEST_UTILS_H_
-#define LIBMINIFI_TEST_UTILS_H_
+#pragma once
+
+#include <string>
 
 #define FIELD_ACCESSOR(field) \
   template<typename T> \
@@ -28,5 +29,3 @@
   static auto call_##method(T&& instance, Args&& ...args) -> decltype((std::forward<T>(instance).method(std::forward<Args>(args)...))) { \
     return std::forward<T>(instance).method(std::forward<Args>(args)...); \
   }
-
-#endif  // LIBMINIFI_TEST_UTILS_H_
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 446b71b..76507df 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -40,8 +40,9 @@
 #include "processors/LogAttribute.h"
 #include "processors/PutFile.h"
 #include "utils/file/FileUtils.h"
+#include "../Utils.h"
 
-class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+class ReadCallback: public minifi::InputStreamCallback {
  public:
   explicit ReadCallback(size_t size) :
       read_size_(0) {
@@ -56,7 +57,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
     if (archive_buffer_)
       delete[] archive_buffer_;
   }
-  int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     int64_t total_read = 0;
     int64_t ret = 0;
     do {
@@ -108,24 +109,24 @@ class CompressDecompressionTestController : public TestController{
   }
 
   void setupFlow() {
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<processors::CompressContent>();
+    LogTestController::getInstance().setTrace<processors::LogAttribute>();
     LogTestController::getInstance().setTrace<core::ProcessSession>();
     LogTestController::getInstance().setTrace<core::ProcessContext>();
     LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+    LogTestController::getInstance().setTrace<minifi::Connection>();
+    LogTestController::getInstance().setTrace<minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<minifi::io::FileStream>();
 
     std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-    processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    processor = std::make_shared<processors::CompressContent>("compresscontent");
     processor->initialize();
     utils::Identifier processoruuid = processor->getUUID();
     REQUIRE(processoruuid);
 
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+    content_repo->initialize(std::make_shared<minifi::Configure>());
     // connection from compress processor to log attribute
     output = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
     output->addRelationship(core::Relationship("success", "compress successful output"));
@@ -206,8 +207,8 @@ class CompressTestController : public CompressDecompressionTestController {
 
  public:
   CompressTestController() {
-    char format[] = "/tmp/test.XXXXXX";
-    tempDir_ = get_global_controller().createTempDirectory(format);
+    char CompressionFormat[] = "/tmp/test.XXXXXX";
+    tempDir_ = get_global_controller().createTempDirectory(CompressionFormat);
     REQUIRE(!tempDir_.empty());
     raw_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-expect-compresscontent.txt");
     compressed_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-compresscontent");
@@ -234,11 +235,14 @@ class DecompressTestController : public CompressDecompressionTestController{
   }
 };
 
+using CompressionFormat = processors::CompressContent::ExtendedCompressionFormat;
+using CompressionMode = processors::CompressContent::CompressionMode;
+
 TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -260,7 +264,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+    flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/gzip");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
@@ -273,10 +277,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1
 }
 
 TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfiletest2]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -298,7 +302,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+    REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -307,10 +311,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfilet
 }
 
 TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::BZIP2));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -332,7 +336,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+    flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/bzip2");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
@@ -346,10 +350,10 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3
 
 
 TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfiletest4]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::BZIP2));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -371,7 +375,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+    REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -380,10 +384,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfilet
 }
 
 TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::LZMA));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -411,7 +415,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+    flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/x-lzma");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
@@ -425,15 +429,15 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5
 
 
 TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfiletest6]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::USE_MIME_TYPE));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
   sessionGenFlowFile.import(compressedPath(), flow, true, 0);
-  flow->setAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma");
+  flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-lzma");
   sessionGenFlowFile.flushContent();
   input->put(flow);
 
@@ -457,7 +461,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+    REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -466,10 +470,10 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfilet
 }
 
 TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletest7]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::XZ_LZMA2));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
@@ -497,7 +501,7 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime);
+    flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/x-xz");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
@@ -511,15 +515,15 @@ TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletes
 
 
 TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfiletest8]") {
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Decompress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::USE_MIME_TYPE));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
   sessionGenFlowFile.import(compressedPath(), flow, true, 0);
-  flow->setAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz");
+  flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, "application/x-xz");
   sessionGenFlowFile.flushContent();
   input->put(flow);
 
@@ -543,7 +547,7 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil
   {
     REQUIRE(flow1->getSize() != flow->getSize());
     std::string mime;
-    REQUIRE(flow1->getAttribute(org::apache::nifi::minifi::core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
+    REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) == false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
@@ -552,8 +556,8 @@ TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfil
 }
 
 TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfiletest8]") {
-  LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-  LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
+  LogTestController::getInstance().setTrace<processors::CompressContent>();
+  LogTestController::getInstance().setTrace<processors::PutFile>();
 
   // Create temporary directories
   char format_src[] = "/tmp/archives.XXXXXX";
@@ -599,8 +603,8 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
   plan->setProperty(get_file, "Input Directory", src_dir);
 
   // Configure CompressContent processor for compression
-  plan->setProperty(compress_content, "Mode", MODE_COMPRESS);
-  plan->setProperty(compress_content, "Compression Format", COMPRESSION_FORMAT_GZIP);
+  plan->setProperty(compress_content, "Mode", toString(CompressionMode::Compress));
+  plan->setProperty(compress_content, "Compression Format", toString(CompressionFormat::GZIP));
   plan->setProperty(compress_content, "Update Filename", "true");
   plan->setProperty(compress_content, "Encapsulate in TAR", "false");
 
@@ -608,8 +612,8 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
   plan->setProperty(put_compressed, "Directory", dst_dir);
 
   // Configure CompressContent processor for decompression
-  plan->setProperty(decompress_content, "Mode", MODE_DECOMPRESS);
-  plan->setProperty(decompress_content, "Compression Format", COMPRESSION_FORMAT_GZIP);
+  plan->setProperty(decompress_content, "Mode", toString(CompressionMode::Decompress));
+  plan->setProperty(decompress_content, "Compression Format", toString(CompressionFormat::GZIP));
   plan->setProperty(decompress_content, "Update Filename", "true");
   plan->setProperty(decompress_content, "Encapsulate in TAR", "false");
 
@@ -653,3 +657,69 @@ TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfi
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE_METHOD(CompressTestController, "Batch CompressFileGZip", "[compressFileBatchTest]") {
+  std::vector<std::string> flowFileContents{
+    utils::StringUtils::repeat("0", 1000), utils::StringUtils::repeat("1", 1000),
+    utils::StringUtils::repeat("2", 1000), utils::StringUtils::repeat("3", 1000),
+  };
+  const std::size_t batchSize = 3;
+
+  context->setProperty(processors::CompressContent::CompressMode, toString(CompressionMode::Compress));
+  context->setProperty(processors::CompressContent::CompressFormat, toString(CompressionFormat::GZIP));
+  context->setProperty(processors::CompressContent::CompressLevel, "9");
+  context->setProperty(processors::CompressContent::UpdateFileName, "true");
+  context->setProperty(processors::CompressContent::BatchSize, std::to_string(batchSize));
+
+
+  core::ProcessSession sessionGenFlowFile(context);
+  for (const auto& content : flowFileContents) {
+    auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(content), flow);
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
+  }
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+
+  // Trigger once to process batchSize
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::vector<std::shared_ptr<core::FlowFile>> outFiles;
+  while (std::shared_ptr<core::FlowFile> file = output->poll(expiredFlowRecords)) {
+    outFiles.push_back(std::move(file));
+  }
+  REQUIRE(outFiles.size() == batchSize);
+
+  // Trigger a second time to process the remaining files
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+
+  while (std::shared_ptr<core::FlowFile> file = output->poll(expiredFlowRecords)) {
+    outFiles.push_back(std::move(file));
+  }
+  REQUIRE(outFiles.size() == flowFileContents.size());
+
+  for (std::size_t idx = 0; idx < outFiles.size(); ++idx) {
+    auto file = outFiles[idx];
+    std::string mime;
+    file->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
+    REQUIRE(mime == "application/gzip");
+    ReadCallback callback(gsl::narrow<size_t>(file->getSize()));
+    sessionGenFlowFile.read(file, &callback);
+    callback.archive_read();
+    std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+    REQUIRE(flowFileContents[idx] == content);
+  }
+}
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index d8d2bed..384f9f1 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -38,6 +38,7 @@
 #include "../unit/ProvenanceTestHelper.h"
 #include "serialization/FlowFileV3Serializer.h"
 #include "serialization/PayloadSerializer.h"
+#include "../Utils.h"
 
 std::string FLOW_FILE;
 std::string EXPECT_MERGE_CONTENT_FIRST;
@@ -63,7 +64,7 @@ void init_file_paths() {
   static Initializer initializer;
 }
 
-class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
+class FixedBuffer : public minifi::InputStreamCallback {
  public:
   explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
     buf_.reset(new uint8_t[capacity_]);
@@ -95,7 +96,7 @@ class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
     } while (size_ != capacity_);
     return total_read;
   }
-  int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     return write(*stream.get(), capacity_);
   }
 
@@ -182,11 +183,16 @@ class MergeTestController : public TestController {
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
     context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
+
+    for (size_t i = 0; i < 6; ++i) {
+      flowFileContents[i] = utils::StringUtils::repeat(std::to_string(i), 32);
+    }
   }
   ~MergeTestController() {
     LogTestController::getInstance().reset();
   }
 
+  std::string flowFileContents[6];
   std::shared_ptr<core::ProcessContext> context;
   std::shared_ptr<core::Processor> processor;
   std::shared_ptr<minifi::Connection> input;
@@ -194,34 +200,20 @@ class MergeTestController : public TestController {
 };
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-  }
+  std::string expected[2]{
+    flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+    flowFileContents[3] + flowFileContents[4] + flowFileContents[5]
+  };
 
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
 
   core::ProcessSession sessionGenFlowFile(context);
-  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  // Generate 6 flowfiles, first three merged to one, second three merged to one
   for (const int i : {0, 2, 5, 4, 1, 3}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     // three bundle
     if (i < 3)
       flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -251,67 +243,38 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]")
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
-    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
     sessionGenFlowFile.read(flow2, &callback);
-    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[1]);
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefiletest2]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::ofstream headerfile(HEADER_FILE, std::ios::binary);
-    std::ofstream footerfile(FOOTER_FILE, std::ios::binary);
-    std::ofstream demarcatorfile(DEMARCATOR_FILE, std::ios::binary);
-    headerfile << "header";
-    expectfileFirst << "header";
-    expectfileSecond << "header";
-    footerfile << "footer";
-    demarcatorfile << "demarcator";
-
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      if (i != 0 && i <= 2)
-        expectfileFirst << "demarcator";
-      if (i != 3 && i >= 4)
-        expectfileSecond << "demarcator";
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-    expectfileFirst << "footer";
-    expectfileSecond << "footer";
-  }
+  std::string expected[2]{
+    "header" + flowFileContents[0] + "demarcator" + flowFileContents[1] + "demarcator" + flowFileContents[2] + "footer",
+    "header" + flowFileContents[3] + "demarcator" + flowFileContents[4] + "demarcator" + flowFileContents[5] + "footer"
+  };
+
+  std::ofstream(HEADER_FILE, std::ios::binary) << "header";
+  std::ofstream(FOOTER_FILE, std::ios::binary) << "footer";
+  std::ofstream(DEMARCATOR_FILE, std::ios::binary) << "demarcator";
 
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
+  context->setProperty(processors::MergeContent::Header, HEADER_FILE);
+  context->setProperty(processors::MergeContent::Footer, FOOTER_FILE);
+  context->setProperty(processors::MergeContent::Demarcator, DEMARCATOR_FILE);
 
   core::ProcessSession sessionGenFlowFile(context);
-  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  // Generate 6 flowfiles, first three merged to one, second three merged to one
   for (const int i : {0, 2, 5, 4, 1, 3}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     // three bundle
     if (i < 3)
       flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -342,52 +305,33 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefil
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
-    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 128);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
     sessionGenFlowFile.read(flow2, &callback);
-    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[1]);
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file, drop record 4
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-  }
+  // drop record 4
+  std::string expected[2]{
+    flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+    flowFileContents[3] + flowFileContents[5]
+  };
 
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MaxBinAge, "1 sec");
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 5 flowfiles, first threes merged to one, the other two merged to one
   for (const int i : {0, 2, 5, 1, 3}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     // three bundle
     if (i < 3)
       flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
@@ -424,51 +368,33 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefile
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
-    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 64);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
     sessionGenFlowFile.read(flow2, &callback);
-    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[1]);
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-  }
+  std::string expected[2]{
+    flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+    flowFileContents[3] + flowFileContents[4] + flowFileContents[5]
+  };
 
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MinSize, "96");
+  context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
   for (const int i : {0, 1, 2, 3, 4, 5}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     flow->setAttribute("tag", "tag");
     sessionGenFlowFile.flushContent();
     input->put(flow);
@@ -490,52 +416,29 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
-    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
     sessionGenFlowFile.read(flow2, &callback);
-    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[1]);
   }
 }
 
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-  }
-
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MinSize, "96");
+  context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
   for (const int i : {0, 1, 2, 3, 4, 5}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     flow->setAttribute("tag", "tag");
     sessionGenFlowFile.flushContent();
     input->put(flow);
@@ -560,10 +463,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ifstream file1(flowFileName, std::ios::binary);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      REQUIRE(archives[i].to_string() == contents);
+      REQUIRE(archives[i].to_string() == flowFileContents[i]);
     }
   }
   REQUIRE(flow2->getSize() > 0);
@@ -573,45 +473,23 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ifstream file1(flowFileName, std::ios::binary);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      REQUIRE(archives[i-3].to_string() == contents);
+      REQUIRE(archives[i-3].to_string() == flowFileContents[i]);
     }
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        if (i < 3)
-          expectfileFirst << std::to_string(i);
-        else
-          expectfileSecond << std::to_string(i);
-      }
-    }
-  }
-
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MinSize, "96");
+  context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
   for (const int i : {0, 1, 2, 3, 4, 5}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     flow->setAttribute("tag", "tag");
     sessionGenFlowFile.flushContent();
     input->put(flow);
@@ -636,10 +514,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ifstream file1(flowFileName, std::ios::binary);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      REQUIRE(archives[i].to_string() == contents);
+      REQUIRE(archives[i].to_string() == flowFileContents[i]);
     }
   }
   REQUIRE(flow2->getSize() > 0);
@@ -649,41 +524,28 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ifstream file1(flowFileName, std::ios::binary);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      REQUIRE(archives[i-3].to_string() == contents);
+      REQUIRE(archives[i-3].to_string() == flowFileContents[i]);
     }
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 6; i++) {
-      std::ofstream{std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt", std::ios::binary} << std::to_string(i);
-      if (i % 2 == 0)
-        expectfileFirst << std::to_string(i);
-      else
-        expectfileSecond << std::to_string(i);
-    }
-  }
+  std::string expected[2]{
+      flowFileContents[0] + flowFileContents[2] + flowFileContents[4],
+      flowFileContents[1] + flowFileContents[3] + flowFileContents[5]
+  };
 
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinEntries, "3");
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MinEntries, "3");
+  context->setProperty(processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 6 flowfiles, even files are merged to one, odd files are merged to an other
   for (const int i : {0, 1, 2, 3, 4, 5}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     if (i % 2 == 0)
       flow->setAttribute("tag", "even");
     else
@@ -707,45 +569,26 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]"
   {
     FixedBuffer callback(flow1->getSize());
     sessionGenFlowFile.read(flow1, &callback);
-    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[0]);
   }
   {
     FixedBuffer callback(flow2->getSize());
     sessionGenFlowFile.read(flow2, &callback);
-    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
-    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-    REQUIRE(callback.to_string() == contents);
+    REQUIRE(callback.to_string() == expected[1]);
   }
 }
 
 TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 3; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        expectfileFirst << std::to_string(i);
-      }
-    }
-  }
-
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
 
   core::ProcessSession sessionGenFlowFile(context);
 
   // Generate 3 flowfiles merging all into one
   for (const int i : {1, 2, 0}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
     flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
@@ -785,31 +628,16 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only C
 }
 
 TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
-  {
-    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
-
-    // Create and write to the test file
-    for (int i = 0; i < 3; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      std::ofstream tmpfile(flowFileName.c_str(), std::ios::binary);
-      for (int j = 0; j < 32; j++) {
-        tmpfile << std::to_string(i);
-        expectfileFirst << std::to_string(i);
-      }
-    }
-  }
-
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
-  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy, org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::AttributeStrategy, processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
 
   core::ProcessSession sessionGenFlowFile(context);
   // Generate 3 flowfiles merging all into one
   for (const int i : {1, 2, 0}) {
-    const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
-    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
     flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
     flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
@@ -942,3 +770,51 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]") {
+  std::string expected[2]{
+    flowFileContents[0] + flowFileContents[1] + flowFileContents[2],
+    flowFileContents[3] + flowFileContents[4]
+  };
+
+  context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::BinFiles::BatchSize, "3");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  // enqueue 5 (five) flowFiles
+  for (const int i : {0, 1, 2, 3, 4}) {
+    const auto flow = sessionGenFlowFile.create();
+    sessionGenFlowFile.importFrom(minifi::io::BufferStream(flowFileContents[i]), flow);
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
+  }
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  // two trigger is enough to process all five flowFiles
+  for (int i = 0; i < 2; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(expiredFlowRecords.size() == 0);
+  REQUIRE(flow1);
+  {
+    FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
+    sessionGenFlowFile.read(flow1, &callback);
+    REQUIRE(callback.to_string() == expected[0]);
+  }
+  REQUIRE(flow2);
+  {
+    FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
+    sessionGenFlowFile.read(flow2, &callback);
+    REQUIRE(callback.to_string() == expected[1]);
+  }
+}
diff --git a/libminifi/test/unit/EnumTests.cpp b/libminifi/test/unit/EnumTests.cpp
new file mode 100644
index 0000000..574c768
--- /dev/null
+++ b/libminifi/test/unit/EnumTests.cpp
@@ -0,0 +1,105 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+#include <string>
+#include <type_traits>
+#include "utils/Enum.h"
+#include "catch.hpp"
+
+// we need this instead of void_t in GeneralUtils because of GCC4.8
+template<typename ...>
+struct make_void {
+  using type = void;
+};
+
+#define CHECK_COMPILE(name, clazz, expr, result) \
+  template<typename, typename = void> \
+  struct does_compile ## name : std::false_type {};\
+  template<typename T> \
+  struct does_compile ## name<T, typename make_void<decltype(std::declval<T>().template expr)>::type> : std::true_type {}; \
+  static_assert(does_compile ## name<clazz>::value == result, "");
+
+SMART_ENUM(A,
+  (_0, "zero"),
+  (_1, "one")
+)
+
+SMART_ENUM_EXTEND(B, A, (_0, _1),
+  (_2, "two")
+)
+
+SMART_ENUM_EXTEND(C, B, (_0, _1, _2),
+  (_3, "three")
+)
+
+SMART_ENUM(Unrelated,
+  (a, "a"),
+  (b, "b")
+)
+
+// static tests
+namespace test {
+
+CHECK_COMPILE(_1, B, cast<A>(), true)
+CHECK_COMPILE(_2, C, cast<A>(), true)
+CHECK_COMPILE(_3, C, cast<B>(), true)
+
+// casting to unrelated fails
+CHECK_COMPILE(_7, B, cast<Unrelated>(), false)
+CHECK_COMPILE(_8, B, cast<Unrelated>(), false)
+
+}  // namespace test
+
+TEST_CASE("Enum checks") {
+  REQUIRE(!A{});
+  REQUIRE(!B{});
+  REQUIRE(A{A::Type(0)});
+  REQUIRE(B{B::Type(1)});
+
+  REQUIRE(A::values() == (std::set<std::string>{"zero", "one"}));
+  REQUIRE(B::values() == (std::set<std::string>{"zero", "one", "two"}));
+  REQUIRE(C::values() == (std::set<std::string>{"zero", "one", "two", "three"}));
+
+  REQUIRE_THROWS(A::parse("not_any"));
+  REQUIRE(!A::parse("not_any", false));
+  REQUIRE(A::parse("zero") == A::_0);
+  REQUIRE(B::parse("zero") == B::_0);
+  REQUIRE(C::parse("one") == C::_1);
+  REQUIRE(C::parse("three") == C::_3);
+  REQUIRE_THROWS(C::parse("nada"));
+  REQUIRE(!C::parse("nada", false));
+
+  REQUIRE(A{A::_0}.toString() == std::string{"zero"});
+  REQUIRE(toString(A::_0) == std::string{"zero"});
+  REQUIRE(B{B::_0}.toString() == std::string{"zero"});
+  REQUIRE(toString(B::_0) == std::string{"zero"});
+  REQUIRE(toString(B::_2) == std::string{"two"});
+  REQUIRE(toString(C::_1) == std::string{"one"});
+  REQUIRE(toString(C::_3) == std::string{"three"});
+  REQUIRE_THROWS(toString(A::Type(55)));
+  REQUIRE_THROWS(toString(C::Type(-1)));
+  REQUIRE_THROWS(C{}.toString());
+
+  REQUIRE(B{B::_0}.cast<A>() == A{A::_0});
+  REQUIRE(C{C::_1}.cast<A>() == A::_1);
+  REQUIRE(C{C::_2}.cast<B>() == B::_2);
+  REQUIRE(!C{C::_3}.cast<B>());
+  REQUIRE(!C{C::_3}.cast<A>());
+  REQUIRE(!B{B::_2}.cast<A>());
+}