You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/05/22 08:15:54 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

adamdebreceni opened a new pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429677443



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -37,88 +37,112 @@
 #include "../TestBase.h"
 #include "../unit/ProvenanceTestHelper.h"
 
-static const char* FLOW_FILE = "/tmp/minifi-mergecontent";
-static const char* EXPECT_MERGE_CONTENT_FIRST = "/tmp/minifi-expect-mergecontent1.txt";
-static const char* EXPECT_MERGE_CONTENT_SECOND = "/tmp/minifi-expect-mergecontent2.txt";
-static const char* HEADER_FILE = "/tmp/minifi-mergecontent.header";
-static const char* FOOTER_FILE = "/tmp/minifi-mergecontent.footer";
-static const char* DEMARCATOR_FILE = "/tmp/minifi-mergecontent.demarcator";
-
-class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+std::string FLOW_FILE;
+std::string EXPECT_MERGE_CONTENT_FIRST;
+std::string EXPECT_MERGE_CONTENT_SECOND;
+std::string HEADER_FILE;
+std::string FOOTER_FILE;
+std::string DEMARCATOR_FILE;
+
+void init_file_paths() {
+  struct Initializer {
+    Initializer() {
+      static TestController global_controller;
+      char format[] = "/tmp/test.XXXXXX";
+      std::string tempDir = global_controller.createTempDirectory(format);
+      FLOW_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent");
+      EXPECT_MERGE_CONTENT_FIRST = utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent1.txt");
+      EXPECT_MERGE_CONTENT_SECOND = utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent2.txt");
+      HEADER_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.header");
+      FOOTER_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.footer");
+      DEMARCATOR_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.demarcator");
+    }
+  };
+  static Initializer initializer;
+}
+
+class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(uint64_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_num_ = 0;
+  explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
+    buf_ = new uint8_t[capacity_];

Review comment:
       Is there any reason that prevent using unique ptr here? 

##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
         expectfileSecond << "demarcator";
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
     expectfileFirst << "footer";
     expectfileSecond << "footer";
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[4]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 128);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 128);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-    unlink(FOOTER_FILE);
-    unlink(HEADER_FILE);
-    unlink(DEMARCATOR_FILE);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[4]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 128);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
   }
+  REQUIRE(flow2->getSize() == 128);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file, drop record 4
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
+  }
 
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
-    {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 64);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 64);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 96);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 96);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("MergeFileTar", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 0; i < 3; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 0; i < 3; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i]), callback.archive_buffer_size_[i]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
-    }
-    REQUIRE(flow2->getSize() > 0);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 3; i < 6; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i].to_string() == contents);
     }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
+  }
+  REQUIRE(flow2->getSize() > 0);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 3; i < 6; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i-3].to_string() == contents);
     }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileZip", "[mergefiletest5]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_ZIP_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);

Review comment:
       This part also seems to be very similar in all the testcases in this file.
   Is it possible to create a function to avoid copy-pasting this?
   
   The changes here look good, my goal is just to further improve the code. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r430528786



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
         expectfileSecond << "demarcator";
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
     expectfileFirst << "footer";
     expectfileSecond << "footer";
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[4]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 128);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 128);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-    unlink(FOOTER_FILE);
-    unlink(HEADER_FILE);
-    unlink(DEMARCATOR_FILE);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[4]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 128);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
   }
+  REQUIRE(flow2->getSize() == 128);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file, drop record 4
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
+  }
 
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
-    {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 64);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 64);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 96);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 96);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("MergeFileTar", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 0; i < 3; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 0; i < 3; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i]), callback.archive_buffer_size_[i]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
-    }
-    REQUIRE(flow2->getSize() > 0);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 3; i < 6; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i].to_string() == contents);
     }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
+  }
+  REQUIRE(flow2->getSize() > 0);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 3; i < 6; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i-3].to_string() == contents);
     }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileZip", "[mergefiletest5]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_ZIP_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);

Review comment:
       Fair




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
arpadboda commented on pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#issuecomment-634121541


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429768326



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -37,88 +37,112 @@
 #include "../TestBase.h"
 #include "../unit/ProvenanceTestHelper.h"
 
-static const char* FLOW_FILE = "/tmp/minifi-mergecontent";
-static const char* EXPECT_MERGE_CONTENT_FIRST = "/tmp/minifi-expect-mergecontent1.txt";
-static const char* EXPECT_MERGE_CONTENT_SECOND = "/tmp/minifi-expect-mergecontent2.txt";
-static const char* HEADER_FILE = "/tmp/minifi-mergecontent.header";
-static const char* FOOTER_FILE = "/tmp/minifi-mergecontent.footer";
-static const char* DEMARCATOR_FILE = "/tmp/minifi-mergecontent.demarcator";
-
-class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
+std::string FLOW_FILE;
+std::string EXPECT_MERGE_CONTENT_FIRST;
+std::string EXPECT_MERGE_CONTENT_SECOND;
+std::string HEADER_FILE;
+std::string FOOTER_FILE;
+std::string DEMARCATOR_FILE;
+
+void init_file_paths() {
+  struct Initializer {
+    Initializer() {
+      static TestController global_controller;
+      char format[] = "/tmp/test.XXXXXX";
+      std::string tempDir = global_controller.createTempDirectory(format);
+      FLOW_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent");
+      EXPECT_MERGE_CONTENT_FIRST = utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent1.txt");
+      EXPECT_MERGE_CONTENT_SECOND = utils::file::FileUtils::concat_path(tempDir, "minifi-expect-mergecontent2.txt");
+      HEADER_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.header");
+      FOOTER_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.footer");
+      DEMARCATOR_FILE = utils::file::FileUtils::concat_path(tempDir, "minifi-mergecontent.demarcator");
+    }
+  };
+  static Initializer initializer;
+}
+
+class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(uint64_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_num_ = 0;
+  explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
+    buf_ = new uint8_t[capacity_];

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #792: MINIFICPP-1230 - Enable on Win and refactor MergeFileTests

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #792:
URL: https://github.com/apache/nifi-minifi-cpp/pull/792#discussion_r429760668



##########
File path: libminifi/test/archive-tests/MergeFileTests.cpp
##########
@@ -275,785 +312,451 @@ TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
         expectfileSecond << "demarcator";
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
     expectfileFirst << "footer";
     expectfileSecond << "footer";
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, "/tmp/minifi-mergecontent.header");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, "/tmp/minifi-mergecontent.footer");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, "/tmp/minifi-mergecontent.demarcator");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[4]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 128);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 128);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-    unlink(FOOTER_FILE);
-    unlink(HEADER_FILE);
-    unlink(DEMARCATOR_FILE);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_FILENAME);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Header, HEADER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Footer, FOOTER_FILE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[4]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 128);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
   }
+  REQUIRE(flow2->getSize() == 128);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file, drop record 4
     for (int i = 0; i < 6; i++) {
       if (i == 4)
         continue;
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
+  }
 
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      // three bundle
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
-      if (i < 3)
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
-      else
-        flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
-      flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[2]);
-    income_connection->put(record[5]);
-    income_connection->put(record[1]);
-    income_connection->put(record[3]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
-    {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 64);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      if (i == 4)
-        continue;
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_DEFRAGMENT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    // three bundle
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(0));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_ID_ATTRIBUTE, std::to_string(1));
+    if (i < 3)
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i));
+    else
+      flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
+    flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[2]);
+  input->put(record[5]);
+  input->put(record[1]);
+  input->put(record[3]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    if (i == 4)
+      continue;
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+  {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 64);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() == 96);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      std::ifstream file1;
-      file1.open(EXPECT_MERGE_CONTENT_FIRST, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file1.close();
-    }
-    REQUIRE(flow2->getSize() == 96);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      std::ifstream file2;
-      file2.open(EXPECT_MERGE_CONTENT_SECOND, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      REQUIRE(expectContents == contents);
-      file2.close();
-    }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
-      std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
-    }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
   }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() == 96);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  REQUIRE(flow2->getSize() == 96);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+    REQUIRE(callback.to_string() == contents);
+  }
+  LogTestController::getInstance().reset();
 }
 
 
 TEST_CASE("MergeFileTar", "[mergefiletest4]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    for (int i = 0; i < 6; i++) {
-      std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_TAR_VALUE);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
+  context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+  core::ProcessSession sessionGenFlowFile(context);
+  std::shared_ptr<core::FlowFile> record[6];
+
+  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
+  for (int i = 0; i < 6; i++) {
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+    std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+    sessionGenFlowFile.import(flowFileName, flow, true, 0);
+    flow->setAttribute("tag", "tag");
+    record[i] = flow;
+  }
+  input->put(record[0]);
+  input->put(record[1]);
+  input->put(record[2]);
+  input->put(record[3]);
+  input->put(record[4]);
+  input->put(record[5]);
+
+  REQUIRE(processor->getName() == "mergecontent");
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 6; i++) {
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+  // validate the merge content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+  std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+  REQUIRE(flow1->getSize() > 0);
+  {
+    FixedBuffer callback(flow1->getSize());
+    sessionGenFlowFile.read(flow1, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 0; i < 3; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      sessionGenFlowFile.import(flowFileName, flow, true, 0);
-      flow->setAttribute("tag", "tag");
-      record[i] = flow;
-    }
-    income_connection->put(record[0]);
-    income_connection->put(record[1]);
-    income_connection->put(record[2]);
-    income_connection->put(record[3]);
-    income_connection->put(record[4]);
-    income_connection->put(record[5]);
-
-    REQUIRE(processor->getName() == "mergecontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    for (int i = 0; i < 6; i++) {
-      auto session = std::make_shared<core::ProcessSession>(context);
-      processor->onTrigger(context, session);
-      session->commit();
-    }
-    // validate the merge content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    std::shared_ptr<core::FlowFile> flow2 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 0; i < 3; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i]), callback.archive_buffer_size_[i]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
-    }
-    REQUIRE(flow2->getSize() > 0);
-    {
-      ReadCallback callback(flow2->getSize());
-      sessionGenFlowFile.read(flow2, &callback);
-      callback.archive_read();
-      REQUIRE(callback.archive_buffer_num_ == 3);
-      for (int i = 3; i < 6; i++) {
-        std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-        std::ifstream file1;
-        file1.open(flowFileName, std::ios::in);
-        std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-        std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_[i-3]), callback.archive_buffer_size_[i-3]);
-        REQUIRE(expectContents == contents);
-        file1.close();
-      }
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i].to_string() == contents);
     }
-    LogTestController::getInstance().reset();
-    for (int i = 0; i < 6; i++) {
+  }
+  REQUIRE(flow2->getSize() > 0);
+  {
+    FixedBuffer callback(flow2->getSize());
+    sessionGenFlowFile.read(flow2, &callback);
+    auto archives = read_archives(callback);
+    REQUIRE(archives.size() == 3);
+    for (int i = 3; i < 6; i++) {
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      unlink(flowFileName.c_str());
+      std::ifstream file1(flowFileName, std::ios::binary);
+      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+      REQUIRE(archives[i-3].to_string() == contents);
     }
-    unlink(EXPECT_MERGE_CONTENT_FIRST);
-    unlink(EXPECT_MERGE_CONTENT_SECOND);
-  } catch (...) {
   }
+  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("MergeFileZip", "[mergefiletest5]") {
-  try {
-    std::ofstream expectfileFirst;
-    std::ofstream expectfileSecond;
-    expectfileFirst.open(EXPECT_MERGE_CONTENT_FIRST);
-    expectfileSecond.open(EXPECT_MERGE_CONTENT_SECOND);
+  {
+    std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+    std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
 
     // Create and write to the test file
     for (int i = 0; i < 6; i++) {
       std::ofstream tmpfile;
       std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
-      tmpfile.open(flowFileName.c_str());
+      tmpfile.open(flowFileName.c_str(), std::ios::binary);
       for (int j = 0; j < 32; j++) {
         tmpfile << std::to_string(i);
         if (i < 3)
           expectfileFirst << std::to_string(i);
         else
           expectfileSecond << std::to_string(i);
       }
-      tmpfile.close();
     }
-    expectfileFirst.close();
-    expectfileSecond.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from merge processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("merged", "Merge successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to merge processor
-    std::shared_ptr<minifi::Connection> mergeconnection = std::make_shared<minifi::Connection>(repo, content_repo, "mergeconnection");
-    mergeconnection->setDestination(processor);
-    mergeconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(mergeconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
+  }
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_ZIP_VALUE);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStratgey, DELIMITER_STRATEGY_TEXT);
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinSize, "96");
-    context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::FlowFile> record[6];
-
-    // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);

Review comment:
       it seems to me, that I have already moved this one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org