You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/05/21 14:00:58 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #788: MINIFICPP-1229 - Fix and enable CompressContentTests

adamdebreceni commented on a change in pull request #788:
URL: https://github.com/apache/nifi-minifi-cpp/pull/788#discussion_r428669878



##########
File path: libminifi/test/archive-tests/CompressContentTests.cpp
##########
@@ -91,851 +91,826 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
   int archive_buffer_size_;
 };
 
-TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
-  try {
-    std::ofstream expectfile;
-    expectfile.open(EXPECT_COMPRESS_CONTENT);
+class CompressDecompressionTestController : public TestController{
+protected:
+  static std::string tempDir;
+  static std::string raw_content_path;
+  static std::string compressed_content_path;
+  static TestController global_controller;
+public:
+  class RawContent{
+    std::string content_;
+    RawContent(std::string&& content_): content_(std::move(content_)) {}
+    friend class CompressDecompressionTestController;
+  public:
+    bool operator==(const std::string& actual) const noexcept {
+      return content_ == actual;
+    }
+    bool operator!=(const std::string& actual) const noexcept {
+      return content_ != actual;
+    }
+  };
 
-    std::mt19937 gen(std::random_device { }());
+  std::string rawContentPath() const {
+    return raw_content_path;
+  }
+
+  std::string compressedPath() const {
+    return compressed_content_path;
+  }
+
+  RawContent getRawContent() const {;
+    std::ifstream file;
+    file.open(raw_content_path, std::ios::binary);
+    std::string contents((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
+    file.close();
+    return {std::move(contents)};
+  }
+
+  virtual ~CompressDecompressionTestController() = 0;
+};
+
+CompressDecompressionTestController::~CompressDecompressionTestController() {}
+
+std::string CompressDecompressionTestController::tempDir = "";
+std::string CompressDecompressionTestController::raw_content_path = "";
+std::string CompressDecompressionTestController::compressed_content_path = "";
+TestController CompressDecompressionTestController::global_controller = {};
+
+class CompressTestController : public CompressDecompressionTestController{
+  void initContentWithRandomData(){
+    std::ofstream file;
+    file.open(raw_content_path, std::ios::binary);
+
+    std::mt19937 gen((int)0x454);
     for (int i = 0; i < 100000; i++) {
-      expectfile << std::to_string(gen() % 100);
-    }
-    expectfile.close();
-
-    TestController testController;
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
-    LogTestController::getInstance().setTrace<core::ProcessSession>();
-    LogTestController::getInstance().setTrace<core::ProcessContext>();
-    LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-    processor->initialize();
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    // connection from compress processor to log attribute
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "compress successful output"));
-    connection->setSource(processor);
-    connection->setDestination(logAttributeProcessor);
-    connection->setSourceUUID(processoruuid);
-    connection->setDestinationUUID(logAttributeuuid);
-    processor->addConnection(connection);
-    // connection to compress processor
-    std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
-    compressconnection->setDestination(processor);
-    compressconnection->setDestinationUUID(processoruuid);
-    processor->addConnection(compressconnection);
-
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-    processor->incrementActiveTasks();
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    logAttributeProcessor->incrementActiveTasks();
-    logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
-    context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
-    core::ProcessSession sessionGenFlowFile(context);
-    std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
-    std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
-    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-    sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
-    income_connection->put(flow);
-
-    REQUIRE(processor->getName() == "compresscontent");
-    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-    processor->onSchedule(context, factory);
-    auto session = std::make_shared<core::ProcessSession>(context);
-    processor->onTrigger(context, session);
-    session->commit();
-
-    // validate the compress content
-    std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
-    std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
-    REQUIRE(flow1->getSize() > 0);
-    {
-      REQUIRE(flow1->getSize() != flow->getSize());
-      std::string mime;
-      flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
-      REQUIRE(mime == "application/gzip");
-      ReadCallback callback(flow1->getSize());
-      sessionGenFlowFile.read(flow1, &callback);
-      callback.archive_read();
-      std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
-      std::ifstream file1;
-      file1.open(flowFileName, std::ios::in);
-      std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
-      std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-      REQUIRE(expectContents == contents);
-      // write the compress content for next test
-      std::ofstream file(COMPRESS_CONTENT);
-      file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-      file.close();
-      file1.close();
+      file << std::to_string(gen() % 100);
     }
-    LogTestController::getInstance().reset();
-  } catch (...) {
+    file.close();
+  }
+public:
+  CompressTestController(){
+    char format[] = "/tmp/test.XXXXXX";
+    tempDir = global_controller.createTempDirectory(format);
+    REQUIRE(!tempDir.empty());
+    raw_content_path = utils::file::FileUtils::concat_path(tempDir, "minifi-expect-compresscontent.txt");
+    compressed_content_path = utils::file::FileUtils::concat_path(tempDir, "minifi-compresscontent");
+    initContentWithRandomData();
   }
+
+  template<class ...Args>
+  void writeCompressed(Args&& ...args){
+    std::ofstream file(compressed_content_path, std::ios::binary);
+    file.write(std::forward<Args>(args)...);
+    file.close();
+  }
+};
+
+class DecompressTestController : public CompressDecompressionTestController{
+public:
+  ~DecompressTestController(){
+    tempDir = "";
+    raw_content_path = "";
+    compressed_content_path = "";

Review comment:
       the problem is that there is a pair-wise dependency between the tests, the compression result of one test is decompressed in the second, it is certainly solvable the question is do we want to do it as part of this issue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org