You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/09 13:14:39 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1229 - Enable compression tests on Windows; enable debug build;

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new ccdd0aa  MINIFICPP-1229 - Enable compression tests on Windows; enable debug build;
ccdd0aa is described below

commit ccdd0aa11cbec4943583015eda8f2054761f3bc0
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue May 19 13:44:18 2020 +0200

    MINIFICPP-1229 - Enable compression tests on Windows; enable debug build;
    
    MINIFICPP-1229 - Enable archive tests.
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #788
---
 .../test/archive-tests/CompressContentTests.cpp    | 1260 +++++++-------------
 win_build_vs.bat                                   |    8 +-
 2 files changed, 461 insertions(+), 807 deletions(-)

diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 92ce2af..45b55ca 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,9 +41,6 @@
 #include "processors/PutFile.h"
 #include "utils/file/FileUtils.h"
 
-static const char* EXPECT_COMPRESS_CONTENT = "/tmp/minifi-expect-compresscontent.txt";
-static const char* COMPRESS_CONTENT = "/tmp/minifi-compresscontent";
-
 class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
  public:
   explicit ReadCallback(uint64_t size) :
@@ -51,6 +48,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
     buffer_size_ = size;
     buffer_ = new uint8_t[buffer_size_];
     archive_buffer_ = nullptr;
+    archive_buffer_size_ = 0;
   }
   ~ReadCallback() {
     if (buffer_)
@@ -59,13 +57,16 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
       delete[] archive_buffer_;
   }
   int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+    int64_t total_read = 0;
     int64_t ret = 0;
-    ret = stream->read(buffer_, buffer_size_);
-    if (stream)
-      read_size_ = stream->getSize();
-    else
-      read_size_ = buffer_size_;
-    return ret;
+    do {
+      ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      read_size_ += ret;
+      total_read += ret;
+    } while (buffer_size_ != read_size_);
+    return total_read;
   }
   void archive_read() {
     struct archive *a;
@@ -75,12 +76,11 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
     archive_read_open_memory(a, buffer_, read_size_);
     struct archive_entry *ae;
 
-    if (archive_read_next_header(a, &ae) == ARCHIVE_OK) {
-      int size = archive_entry_size(ae);
-      archive_buffer_ = new char[size];
-      archive_buffer_size_ = size;
-      archive_read_data(a, archive_buffer_, size);
-    }
+    REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
+    int size = archive_entry_size(ae);
+    archive_buffer_ = new char[size];
+    archive_buffer_size_ = size;
+    archive_read_data(a, archive_buffer_, size);
     archive_read_free(a);
   }
 
@@ -91,18 +91,23 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
   int archive_buffer_size_;
 };
 
-TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
-  try {
-    std::ofstream expectfile;
-    expectfile.open(EXPECT_COMPRESS_CONTENT);
-
-    std::mt19937 gen(std::random_device { }());
-    for (int i = 0; i < 100000; i++) {
-      expectfile << std::to_string(gen() % 100);
-    }
-    expectfile.close();
+/**
+ * There is strong coupling between these compression and decompression
+ * tests. Some compression tests also set up the stage for the subsequent
+ * decompression test. Each such test controller should either be
+ * CompressTestController or a DecompressTestController.
+ */
+class CompressDecompressionTestController : public TestController{
+ protected:
+  static std::string tempDir_;
+  static std::string raw_content_path_;
+  static std::string compressed_content_path_;
+  static TestController& get_global_controller() {
+    static TestController controller;
+    return controller;
+  }
 
-    TestController testController;
+  void setupFlow() {
     LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
     LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
     LogTestController::getInstance().setTrace<core::ProcessSession>();
@@ -110,832 +115,488 @@ TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
     LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
     LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
     LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
 
     std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
+    processor_ = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    processor_->initialize();
     utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    REQUIRE(true == processor_->getUUID(processoruuid));
 
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-
     content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
     // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
+    output_ = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
+    output_->addRelationship(core::Relationship("success", "compress successful output"));
+    output_->setSource(processor_);
+    output_->setSourceUUID(processoruuid);
+    processor_->addConnection(output_);
     // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
-      REQUIRE(mime == "application/gzip");
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-      REQUIRE(expectContents == contents);
-      // write the compress content for next test
-      std::ofstream file(COMPRESS_CONTENT);
-      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      file.close();
-      file1.close();
-    }
-    LogTestController::getInstance().reset();
-  } catch (...) {
-  }
-}
-
-TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
-  try {
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::ProcessContext>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+    input_ = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
+    input_->setDestination(processor_);
+    input_->setDestinationUUID(processoruuid);
+    processor_->addConnection(input_);
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+    processor_->setAutoTerminatedRelationships({{"failure", ""}});
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    processor_->incrementActiveTasks();
+    processor_->setScheduledState(core::ScheduledState::RUNNING);
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor_);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    LogTestController::getInstance().reset();
-    unlink(COMPRESS_CONTENT);
-    unlink(EXPECT_COMPRESS_CONTENT);
-  } catch (...) {
+    context_ = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
   }
-}
 
-TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
-  try {
-    std::ofstream expectfile;
-    expectfile.open(EXPECT_COMPRESS_CONTENT);
-
-    std::mt19937 gen(std::random_device { }());
-    for (int i = 0; i < 100000; i++) {
-      expectfile << std::to_string(gen() % 100);
+ public:
+  class RawContent{
+    std::string content_;
+    explicit RawContent(std::string&& content_): content_(std::move(content_)) {}
+    friend class CompressDecompressionTestController;
+   public:
+    bool operator==(const std::string& actual) const noexcept {
+      return content_ == actual;
     }
-    expectfile.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    bool operator!=(const std::string& actual) const noexcept {
+      return content_ != actual;
+    }
+  };
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  std::string rawContentPath() const {
+    return raw_content_path_;
+  }
 
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
-      REQUIRE(mime == "application/bzip2");
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-      REQUIRE(expectContents == contents);
-      // write the compress content for next test
-      std::ofstream file(COMPRESS_CONTENT);
-      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      file.close();
-      file1.close();
-    }
-    LogTestController::getInstance().reset();
-  } catch (...) {
+  std::string compressedPath() const {
+    return compressed_content_path_;
   }
-}
 
+  RawContent getRawContent() const {
+    std::ifstream file;
+    file.open(raw_content_path_, std::ios::binary);
+    std::string contents{std::istreambuf_iterator<char>(file), std::istreambuf_iterator<char>()};
+    return RawContent{std::move(contents)};
+  }
 
-TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
-  try {
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+  virtual ~CompressDecompressionTestController() = 0;
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Processor> processor_;
+  std::shared_ptr<core::ProcessContext> context_;
+  std::shared_ptr<minifi::Connection> output_;
+  std::shared_ptr<minifi::Connection> input_;
+};
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+CompressDecompressionTestController::~CompressDecompressionTestController() = default;
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    LogTestController::getInstance().reset();
-    unlink(COMPRESS_CONTENT);
-    unlink(EXPECT_COMPRESS_CONTENT);
-  } catch (...) {
-  }
-}
+std::string CompressDecompressionTestController::tempDir_;
+std::string CompressDecompressionTestController::raw_content_path_;
+std::string CompressDecompressionTestController::compressed_content_path_;
 
-TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
-  try {
-    std::ofstream expectfile;
-    expectfile.open(EXPECT_COMPRESS_CONTENT);
+class CompressTestController : public CompressDecompressionTestController {
+  static void initContentWithRandomData() {
+    int random_seed = 0x454;
+    std::ofstream file;
+    file.open(raw_content_path_, std::ios::binary);
 
-    std::mt19937 gen(std::random_device { }());
+    std::mt19937 gen(random_seed);
+    std::uniform_int_distribution<> dis(0, 99);
     for (int i = 0; i < 100000; i++) {
-      expectfile << std::to_string(gen() % 100);
+      file << std::to_string(dis(gen));
     }
-    expectfile.close();
+  }
 
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+ public:
+  CompressTestController() {
+    char format[] = "/tmp/test.XXXXXX";
+    tempDir_ = get_global_controller().createTempDirectory(format);
+    REQUIRE(!tempDir_.empty());
+    raw_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-expect-compresscontent.txt");
+    compressed_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-compresscontent");
+    initContentWithRandomData();
+    setupFlow();
+  }
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+  template<class ...Args>
+  void writeCompressed(Args&& ...args) {
+    std::ofstream file(compressed_content_path_, std::ios::binary);
+    file.write(std::forward<Args>(args)...);
+  }
+};
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+class DecompressTestController : public CompressDecompressionTestController{
+ public:
+  DecompressTestController() {
+    setupFlow();
+  }
+  ~DecompressTestController() {
+    tempDir_ = "";
+    raw_content_path_ = "";
+    compressed_content_path_ = "";
+  }
+};
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
+  CompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+    REQUIRE(mime == "application/gzip");
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    callback.archive_read();
+    std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+    REQUIRE(testController.getRawContent() == content);
+    // write the compress content for next test
+    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+  }
+  LogTestController::getInstance().reset();
+}
 
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
-      // platform not support LZMA
-      LogTestController::getInstance().reset();
-      return;
-    }
+TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
+  DecompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    REQUIRE(testController.getRawContent() == content);
+  }
+  LogTestController::getInstance().reset();
+}
 
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
-      REQUIRE(mime == "application/x-lzma");
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-      REQUIRE(expectContents == contents);
-      // write the compress content for next test
-      std::ofstream file(COMPRESS_CONTENT);
-      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      file.close();
-      file1.close();
-    }
-    LogTestController::getInstance().reset();
-  } catch (...) {
+TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
+  CompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+    REQUIRE(mime == "application/bzip2");
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    callback.archive_read();
+    std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+    REQUIRE(testController.getRawContent() == contents);
+    // write the compress content for next test
+    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
+  LogTestController::getInstance().reset();
 }
 
 
-TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
-  try {
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
+  DecompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    REQUIRE(testController.getRawContent() == contents);
+  }
+  LogTestController::getInstance().reset();
+}
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
+  CompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+    // platform not support LZMA
+    LogTestController::getInstance().reset();
+    return;
+  }
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+    REQUIRE(mime == "application/x-lzma");
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    callback.archive_read();
+    std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+    REQUIRE(testController.getRawContent() == contents);
+    // write the compress content for next test
+    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+  }
+  LogTestController::getInstance().reset();
+}
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
-    flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
-      // platform not support LZMA
-      LogTestController::getInstance().reset();
-      return;
-    }
 
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
+TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
+  DecompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+    // platform not support LZMA
     LogTestController::getInstance().reset();
-    unlink(COMPRESS_CONTENT);
-    unlink(EXPECT_COMPRESS_CONTENT);
-  } catch (...) {
+    return;
+  }
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    REQUIRE(testController.getRawContent() == contents);
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
-  try {
-    std::ofstream expectfile;
-    expectfile.open(EXPECT_COMPRESS_CONTENT);
-
-    std::mt19937 gen(std::random_device { }());
-    for (int i = 0; i < 100000; i++) {
-      expectfile << std::to_string(gen() % 100);
-    }
-    expectfile.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
-      // platform not support LZMA
-      LogTestController::getInstance().reset();
-      return;
-    }
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
-      REQUIRE(mime == "application/x-xz");
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-      REQUIRE(expectContents == contents);
-      // write the compress content for next test
-      std::ofstream file(COMPRESS_CONTENT);
-      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      file.close();
-      file1.close();
-    }
+  CompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+    // platform not support LZMA
     LogTestController::getInstance().reset();
-  } catch (...) {
+    return;
+  }
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+    REQUIRE(mime == "application/x-xz");
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    callback.archive_read();
+    std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+    REQUIRE(testController.getRawContent() == contents);
+    // write the compress content for next test
+    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
-  try {
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
-    flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    if (LogTestController::getInstance().contains("compression not supported on this platform")) {
-      // platform not support LZMA
-      LogTestController::getInstance().reset();
-      return;
-    }
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
+  DecompressTestController testController;
+  auto context = testController.context_;
+  auto input = testController.input_;
+  auto processor = testController.processor_;
+  auto output = testController.output_;
+
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+  context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
+  input->put(flow);
+
+  REQUIRE(processor->getName() == "compresscontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  auto session = std::make_shared<core::ProcessSession>(context);
+  processor->onTrigger(context, session);
+  session->commit();
+
+  if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+    // platform not support LZMA
     LogTestController::getInstance().reset();
-    unlink(COMPRESS_CONTENT);
-    unlink(EXPECT_COMPRESS_CONTENT);
-  } catch (...) {
+    return;
   }
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    REQUIRE(flow1->getSize() != flow->getSize());
+    std::string mime;
+    REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+    ReadCallback callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    REQUIRE(testController.getRawContent() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
@@ -1019,10 +680,7 @@ TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
     content = content_ss.str();
   }
 
-  std::fstream file;
-  file.open(src_file, std::ios::out);
-  file << content;
-  file.close();
+  std::ofstream{ src_file } << content;
 
   // Run flow
   testController.runSession(plan, true);
diff --git a/win_build_vs.bat b/win_build_vs.bat
index f63b6da..a3039dd 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -27,7 +27,6 @@ set build_kafka=OFF
 set build_coap=OFF
 set build_jni=OFF
 set build_SQL=OFF
-set disable_libarchive=ON
 set generator="Visual Studio 15 2017"
 set cpack=OFF
 set installer_merge_modules=OFF
@@ -51,9 +50,6 @@ for %%x in (%*) do (
 	if [%%~x] EQU [/S] ( 
 	set build_SQL=ON
     )
-	if [%%~x] EQU [/A] ( 
-	set disable_libarchive=OFF
-    )
 	if [%%~x] EQU [/M] ( 
 	set installer_merge_modules=ON
     )
@@ -72,14 +68,14 @@ for %%x in (%*) do (
 mkdir %builddir%
 pushd %builddir%\
 
-cmake -G %generator% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON  -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=%disable_libarchive% -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DEN [...]
+cmake -G %generator% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON  -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DF [...]
 IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
 if [%cpack%] EQU [ON] ( 
 	cpack
 	IF !ERRORLEVEL! NEQ 0 ( popd & exit /b !ERRORLEVEL! )
 )
 if [%skiptests%] NEQ [ON] ( 
-	ctest -C %cmake_build_type% --output-on-failure
+	ctest --parallel 8 -C %cmake_build_type% --output-on-failure
 	IF !ERRORLEVEL! NEQ 0 ( popd & exit /b !ERRORLEVEL! )
 )
 popd