You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/09 13:14:39 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-1229 - Enable
compression tests on Windows; enable debug build;
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new ccdd0aa MINIFICPP-1229 - Enable compression tests on Windows; enable debug build;
ccdd0aa is described below
commit ccdd0aa11cbec4943583015eda8f2054761f3bc0
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue May 19 13:44:18 2020 +0200
MINIFICPP-1229 - Enable compression tests on Windows; enable debug build;
MINIFICPP-1229 - Enable archive tests.
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #788
---
.../test/archive-tests/CompressContentTests.cpp | 1260 +++++++-------------
win_build_vs.bat | 8 +-
2 files changed, 461 insertions(+), 807 deletions(-)
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 92ce2af..45b55ca 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,9 +41,6 @@
#include "processors/PutFile.h"
#include "utils/file/FileUtils.h"
-static const char* EXPECT_COMPRESS_CONTENT = "/tmp/minifi-expect-compresscontent.txt";
-static const char* COMPRESS_CONTENT = "/tmp/minifi-compresscontent";
-
class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
public:
explicit ReadCallback(uint64_t size) :
@@ -51,6 +48,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
buffer_size_ = size;
buffer_ = new uint8_t[buffer_size_];
archive_buffer_ = nullptr;
+ archive_buffer_size_ = 0;
}
~ReadCallback() {
if (buffer_)
@@ -59,13 +57,16 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
delete[] archive_buffer_;
}
int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ int64_t total_read = 0;
int64_t ret = 0;
- ret = stream->read(buffer_, buffer_size_);
- if (stream)
- read_size_ = stream->getSize();
- else
- read_size_ = buffer_size_;
- return ret;
+ do {
+ ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
+ if (ret == 0) break;
+ if (ret < 0) return ret;
+ read_size_ += ret;
+ total_read += ret;
+ } while (buffer_size_ != read_size_);
+ return total_read;
}
void archive_read() {
struct archive *a;
@@ -75,12 +76,11 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
archive_read_open_memory(a, buffer_, read_size_);
struct archive_entry *ae;
- if (archive_read_next_header(a, &ae) == ARCHIVE_OK) {
- int size = archive_entry_size(ae);
- archive_buffer_ = new char[size];
- archive_buffer_size_ = size;
- archive_read_data(a, archive_buffer_, size);
- }
+ REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
+ int size = archive_entry_size(ae);
+ archive_buffer_ = new char[size];
+ archive_buffer_size_ = size;
+ archive_read_data(a, archive_buffer_, size);
archive_read_free(a);
}
@@ -91,18 +91,23 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
int archive_buffer_size_;
};
-TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
- try {
- std::ofstream expectfile;
- expectfile.open(EXPECT_COMPRESS_CONTENT);
-
- std::mt19937 gen(std::random_device { }());
- for (int i = 0; i < 100000; i++) {
- expectfile << std::to_string(gen() % 100);
- }
- expectfile.close();
+/**
+ * There is strong coupling between these compression and decompression
+ * tests. Some compression tests also set up the stage for the subsequent
+ * decompression test. Each such test controller should either be
+ * CompressTestController or a DecompressTestController.
+ */
+class CompressDecompressionTestController : public TestController{
+ protected:
+ static std::string tempDir_;
+ static std::string raw_content_path_;
+ static std::string compressed_content_path_;
+ static TestController& get_global_controller() {
+ static TestController controller;
+ return controller;
+ }
- TestController testController;
+ void setupFlow() {
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::ProcessSession>();
@@ -110,832 +115,488 @@ TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+ LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
+ processor_ = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+ processor_->initialize();
utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ REQUIRE(true == processor_->getUUID(processoruuid));
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
+ output_ = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
+ output_->addRelationship(core::Relationship("success", "compress successful output"));
+ output_->setSource(processor_);
+ output_->setSourceUUID(processoruuid);
+ processor_->addConnection(output_);
// connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
- REQUIRE(mime == "application/gzip");
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- callback.archive_read();
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(expectContents == contents);
- // write the compress content for next test
- std::ofstream file(COMPRESS_CONTENT);
- file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- file.close();
- file1.close();
- }
- LogTestController::getInstance().reset();
- } catch (...) {
- }
-}
-
-TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
- try {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::ProcessContext>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+ input_ = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
+ input_->setDestination(processor_);
+ input_->setDestinationUUID(processoruuid);
+ processor_->addConnection(input_);
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ processor_->setAutoTerminatedRelationships({{"failure", ""}});
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ processor_->incrementActiveTasks();
+ processor_->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor_);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(expectContents == contents);
- file1.close();
- }
- LogTestController::getInstance().reset();
- unlink(COMPRESS_CONTENT);
- unlink(EXPECT_COMPRESS_CONTENT);
- } catch (...) {
+ context_ = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
}
-}
-TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
- try {
- std::ofstream expectfile;
- expectfile.open(EXPECT_COMPRESS_CONTENT);
-
- std::mt19937 gen(std::random_device { }());
- for (int i = 0; i < 100000; i++) {
- expectfile << std::to_string(gen() % 100);
+ public:
+ class RawContent{
+ std::string content_;
+ explicit RawContent(std::string&& content_): content_(std::move(content_)) {}
+ friend class CompressDecompressionTestController;
+ public:
+ bool operator==(const std::string& actual) const noexcept {
+ return content_ == actual;
}
- expectfile.close();
-
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ bool operator!=(const std::string& actual) const noexcept {
+ return content_ != actual;
+ }
+ };
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ std::string rawContentPath() const {
+ return raw_content_path_;
+ }
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
- REQUIRE(mime == "application/bzip2");
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- callback.archive_read();
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(expectContents == contents);
- // write the compress content for next test
- std::ofstream file(COMPRESS_CONTENT);
- file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- file.close();
- file1.close();
- }
- LogTestController::getInstance().reset();
- } catch (...) {
+ std::string compressedPath() const {
+ return compressed_content_path_;
}
-}
+ RawContent getRawContent() const {
+ std::ifstream file;
+ file.open(raw_content_path_, std::ios::binary);
+ std::string contents{std::istreambuf_iterator<char>(file), std::istreambuf_iterator<char>()};
+ return RawContent{std::move(contents)};
+ }
-TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
- try {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+ virtual ~CompressDecompressionTestController() = 0;
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Processor> processor_;
+ std::shared_ptr<core::ProcessContext> context_;
+ std::shared_ptr<minifi::Connection> output_;
+ std::shared_ptr<minifi::Connection> input_;
+};
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+CompressDecompressionTestController::~CompressDecompressionTestController() = default;
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(expectContents == contents);
- file1.close();
- }
- LogTestController::getInstance().reset();
- unlink(COMPRESS_CONTENT);
- unlink(EXPECT_COMPRESS_CONTENT);
- } catch (...) {
- }
-}
+std::string CompressDecompressionTestController::tempDir_;
+std::string CompressDecompressionTestController::raw_content_path_;
+std::string CompressDecompressionTestController::compressed_content_path_;
-TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
- try {
- std::ofstream expectfile;
- expectfile.open(EXPECT_COMPRESS_CONTENT);
+class CompressTestController : public CompressDecompressionTestController {
+ static void initContentWithRandomData() {
+ int random_seed = 0x454;
+ std::ofstream file;
+ file.open(raw_content_path_, std::ios::binary);
- std::mt19937 gen(std::random_device { }());
+ std::mt19937 gen(random_seed);
+ std::uniform_int_distribution<> dis(0, 99);
for (int i = 0; i < 100000; i++) {
- expectfile << std::to_string(gen() % 100);
+ file << std::to_string(dis(gen));
}
- expectfile.close();
+ }
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+ public:
+ CompressTestController() {
+ char format[] = "/tmp/test.XXXXXX";
+ tempDir_ = get_global_controller().createTempDirectory(format);
+ REQUIRE(!tempDir_.empty());
+ raw_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-expect-compresscontent.txt");
+ compressed_content_path_ = utils::file::FileUtils::concat_path(tempDir_, "minifi-compresscontent");
+ initContentWithRandomData();
+ setupFlow();
+ }
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ template<class ...Args>
+ void writeCompressed(Args&& ...args) {
+ std::ofstream file(compressed_content_path_, std::ios::binary);
+ file.write(std::forward<Args>(args)...);
+ }
+};
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+class DecompressTestController : public CompressDecompressionTestController{
+ public:
+ DecompressTestController() {
+ setupFlow();
+ }
+ ~DecompressTestController() {
+ tempDir_ = "";
+ raw_content_path_ = "";
+ compressed_content_path_ = "";
+ }
+};
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
+ CompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+ REQUIRE(mime == "application/gzip");
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ callback.archive_read();
+ std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+ REQUIRE(testController.getRawContent() == content);
+ // write the compress content for next test
+ testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ }
+ LogTestController::getInstance().reset();
+}
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- if (LogTestController::getInstance().contains("compression not supported on this platform")) {
- // platform not support LZMA
- LogTestController::getInstance().reset();
- return;
- }
+TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
+ DecompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(testController.getRawContent() == content);
+ }
+ LogTestController::getInstance().reset();
+}
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
- REQUIRE(mime == "application/x-lzma");
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- callback.archive_read();
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(expectContents == contents);
- // write the compress content for next test
- std::ofstream file(COMPRESS_CONTENT);
- file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- file.close();
- file1.close();
- }
- LogTestController::getInstance().reset();
- } catch (...) {
+TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
+ CompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+ REQUIRE(mime == "application/bzip2");
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ callback.archive_read();
+ std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+ REQUIRE(testController.getRawContent() == contents);
+ // write the compress content for next test
+ testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
+ LogTestController::getInstance().reset();
}
-TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
- try {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
+TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
+ DecompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(testController.getRawContent() == contents);
+ }
+ LogTestController::getInstance().reset();
+}
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
+ CompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+ // platform not support LZMA
+ LogTestController::getInstance().reset();
+ return;
+ }
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+ REQUIRE(mime == "application/x-lzma");
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ callback.archive_read();
+ std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+ REQUIRE(testController.getRawContent() == contents);
+ // write the compress content for next test
+ testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ }
+ LogTestController::getInstance().reset();
+}
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
- flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- if (LogTestController::getInstance().contains("compression not supported on this platform")) {
- // platform not support LZMA
- LogTestController::getInstance().reset();
- return;
- }
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(expectContents == contents);
- file1.close();
- }
+TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
+ DecompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+ // platform not support LZMA
LogTestController::getInstance().reset();
- unlink(COMPRESS_CONTENT);
- unlink(EXPECT_COMPRESS_CONTENT);
- } catch (...) {
+ return;
+ }
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(testController.getRawContent() == contents);
}
+ LogTestController::getInstance().reset();
}
TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
- try {
- std::ofstream expectfile;
- expectfile.open(EXPECT_COMPRESS_CONTENT);
-
- std::mt19937 gen(std::random_device { }());
- for (int i = 0; i < 100000; i++) {
- expectfile << std::to_string(gen() % 100);
- }
- expectfile.close();
-
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
-
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(EXPECT_COMPRESS_CONTENT, flow, true, 0);
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- if (LogTestController::getInstance().contains("compression not supported on this platform")) {
- // platform not support LZMA
- LogTestController::getInstance().reset();
- return;
- }
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
- REQUIRE(mime == "application/x-xz");
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- callback.archive_read();
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(expectContents == contents);
- // write the compress content for next test
- std::ofstream file(COMPRESS_CONTENT);
- file.write(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- file.close();
- file1.close();
- }
+ CompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+ // platform not support LZMA
LogTestController::getInstance().reset();
- } catch (...) {
+ return;
+ }
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime);
+ REQUIRE(mime == "application/x-xz");
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ callback.archive_read();
+ std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
+ REQUIRE(testController.getRawContent() == contents);
+ // write the compress content for next test
+ testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
+ LogTestController::getInstance().reset();
}
TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
- try {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- // LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::io::FileStream>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- // std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- // connection from compress processor to log attribute
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "compress successful output"));
- connection->setSource(processor);
- connection->setDestination(logAttributeProcessor);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logAttributeuuid);
- processor->addConnection(connection);
- // connection to compress processor
- std::shared_ptr<minifi::Connection> compressconnection = std::make_shared<minifi::Connection>(repo, content_repo, "compressconnection");
- compressconnection->setDestination(processor);
- compressconnection->setDestinationUUID(processoruuid);
- processor->addConnection(compressconnection);
-
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- logAttributeProcessor->incrementActiveTasks();
- logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
- context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
-
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::Connectable> income = node->getNextIncomingConnection();
- std::shared_ptr<minifi::Connection> income_connection = std::static_pointer_cast<minifi::Connection>(income);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(COMPRESS_CONTENT, flow, true, 0);
- flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
- income_connection->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
-
- if (LogTestController::getInstance().contains("compression not supported on this platform")) {
- // platform not support LZMA
- LogTestController::getInstance().reset();
- return;
- }
-
- // validate the compress content
- std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
- std::shared_ptr<core::FlowFile> flow1 = connection->poll(expiredFlowRecords);
- REQUIRE(flow1->getSize() > 0);
- {
- REQUIRE(flow1->getSize() != flow->getSize());
- std::string mime;
- REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
- ReadCallback callback(flow1->getSize());
- sessionGenFlowFile.read(flow1, &callback);
- std::string flowFileName = std::string(EXPECT_COMPRESS_CONTENT);
- std::ifstream file1;
- file1.open(flowFileName, std::ios::in);
- std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
- std::string expectContents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(expectContents == contents);
- file1.close();
- }
+ DecompressTestController testController;
+ auto context = testController.context_;
+ auto input = testController.input_;
+ auto processor = testController.processor_;
+ auto output = testController.output_;
+
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
+ context->setProperty(org::apache::nifi::minifi::processors::CompressContent::UpdateFileName, "true");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
+ input->put(flow);
+
+ REQUIRE(processor->getName() == "compresscontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+
+ if (LogTestController::getInstance().contains("compression not supported on this platform")) {
+ // platform not support LZMA
LogTestController::getInstance().reset();
- unlink(COMPRESS_CONTENT);
- unlink(EXPECT_COMPRESS_CONTENT);
- } catch (...) {
+ return;
}
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ REQUIRE(flow1->getSize() > 0);
+ {
+ REQUIRE(flow1->getSize() != flow->getSize());
+ std::string mime;
+ REQUIRE(flow1->getAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), mime) == false);
+ ReadCallback callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ REQUIRE(testController.getRawContent() == contents);
+ }
+ LogTestController::getInstance().reset();
}
TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
@@ -1019,10 +680,7 @@ TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
content = content_ss.str();
}
- std::fstream file;
- file.open(src_file, std::ios::out);
- file << content;
- file.close();
+ std::ofstream{ src_file } << content;
// Run flow
testController.runSession(plan, true);
diff --git a/win_build_vs.bat b/win_build_vs.bat
index f63b6da..a3039dd 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -27,7 +27,6 @@ set build_kafka=OFF
set build_coap=OFF
set build_jni=OFF
set build_SQL=OFF
-set disable_libarchive=ON
set generator="Visual Studio 15 2017"
set cpack=OFF
set installer_merge_modules=OFF
@@ -51,9 +50,6 @@ for %%x in (%*) do (
if [%%~x] EQU [/S] (
set build_SQL=ON
)
- if [%%~x] EQU [/A] (
- set disable_libarchive=OFF
- )
if [%%~x] EQU [/M] (
set installer_merge_modules=ON
)
@@ -72,14 +68,14 @@ for %%x in (%*) do (
mkdir %builddir%
pushd %builddir%\
-cmake -G %generator% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=%disable_libarchive% -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DEN [...]
+cmake -G %generator% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DENABLE_SQL=%build_SQL% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DDISABLE_SCRIPTING=ON -DEXCLUDE_BOOST=ON -DENABLE_WEL=TRUE -DF [...]
IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
if [%cpack%] EQU [ON] (
cpack
IF !ERRORLEVEL! NEQ 0 ( popd & exit /b !ERRORLEVEL! )
)
if [%skiptests%] NEQ [ON] (
- ctest -C %cmake_build_type% --output-on-failure
+ ctest --parallel 8 -C %cmake_build_type% --output-on-failure
IF !ERRORLEVEL! NEQ 0 ( popd & exit /b !ERRORLEVEL! )
)
popd