You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/19 17:33:20 UTC
[1/2] nifi-minifi-cpp git commit: MINIFI-289: Update test folder to
apply linter and set max characters to 200
Repository: nifi-minifi-cpp
Updated Branches:
refs/heads/master bc0d65e1f -> 63c53bcfd
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/PropertyTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/PropertyTests.cpp b/libminifi/test/unit/PropertyTests.cpp
index cd686a1..a2d5858 100644
--- a/libminifi/test/unit/PropertyTests.cpp
+++ b/libminifi/test/unit/PropertyTests.cpp
@@ -17,88 +17,89 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "../../include/core/Property.h"
+#include <string>
#include "utils/StringUtils.h"
#include "../TestBase.h"
-
TEST_CASE("Test Boolean Conversion", "[testboolConversion]") {
-
- bool b;
- REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("true",b));
- REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("True",b));
- REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("TRue",b));
- REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("tRUE",b));
-
- REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALSE",b));
- REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALLSEY",b));
- REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FaLSE",b));
- REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("false",b));
-
+ bool b;
+ REQUIRE(
+ true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("true", b));
+ REQUIRE(
+ true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("True", b));
+ REQUIRE(
+ true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("TRue", b));
+ REQUIRE(
+ true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("tRUE", b));
+ REQUIRE(
+ false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALSE", b));
+ REQUIRE(
+ false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALLSEY", b));
+ REQUIRE(
+ false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FaLSE", b));
+ REQUIRE(
+ false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("false", b));
}
TEST_CASE("Test Trimmer Right", "[testTrims]") {
+ std::string test = "a quick brown fox jumped over the road\t\n";
- std::string test = "a quick brown fox jumped over the road\t\n";
+ REQUIRE(test.c_str()[test.length() - 1] == '\n');
+ REQUIRE(test.c_str()[test.length() - 2] == '\t');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
- REQUIRE(test.c_str()[test.length() - 1] == '\n');
- REQUIRE(test.c_str()[test.length() - 2] == '\t');
- test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
+ REQUIRE(test.c_str()[test.length() - 1] == 'd');
+ REQUIRE(test.c_str()[test.length() - 2] == 'a');
- REQUIRE(test.c_str()[test.length() - 1] == 'd');
- REQUIRE(test.c_str()[test.length() - 2] == 'a');
+ test = "a quick brown fox jumped over the road\v\t";
- test = "a quick brown fox jumped over the road\v\t";
+ REQUIRE(test.c_str()[test.length() - 1] == '\t');
+ REQUIRE(test.c_str()[test.length() - 2] == '\v');
- REQUIRE(test.c_str()[test.length() - 1] == '\t');
- REQUIRE(test.c_str()[test.length() - 2] == '\v');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
- test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
+ REQUIRE(test.c_str()[test.length() - 1] == 'd');
+ REQUIRE(test.c_str()[test.length() - 2] == 'a');
- REQUIRE(test.c_str()[test.length() - 1] == 'd');
- REQUIRE(test.c_str()[test.length() - 2] == 'a');
+ test = "a quick brown fox jumped over the road \f";
- test = "a quick brown fox jumped over the road \f";
+ REQUIRE(test.c_str()[test.length() - 1] == '\f');
+ REQUIRE(test.c_str()[test.length() - 2] == ' ');
- REQUIRE(test.c_str()[test.length() - 1] == '\f');
- REQUIRE(test.c_str()[test.length() - 2] == ' ');
-
- test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
-
- REQUIRE(test.c_str()[test.length() - 1] == 'd');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test);
+ REQUIRE(test.c_str()[test.length() - 1] == 'd');
}
TEST_CASE("Test Trimmer Left", "[testTrims]") {
+ std::string test = "\t\na quick brown fox jumped over the road\t\n";
- std::string test = "\t\na quick brown fox jumped over the road\t\n";
-
- REQUIRE(test.c_str()[0] == '\t');
- REQUIRE(test.c_str()[1] == '\n');
-
- test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
+ REQUIRE(test.c_str()[0] == '\t');
+ REQUIRE(test.c_str()[1] == '\n');
- REQUIRE(test.c_str()[0] == 'a');
- REQUIRE(test.c_str()[1] == ' ');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
- test = "\v\ta quick brown fox jumped over the road\v\t";
+ REQUIRE(test.c_str()[0] == 'a');
+ REQUIRE(test.c_str()[1] == ' ');
- REQUIRE(test.c_str()[0] == '\v');
- REQUIRE(test.c_str()[1] == '\t');
+ test = "\v\ta quick brown fox jumped over the road\v\t";
- test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
+ REQUIRE(test.c_str()[0] == '\v');
+ REQUIRE(test.c_str()[1] == '\t');
- REQUIRE(test.c_str()[0] == 'a');
- REQUIRE(test.c_str()[1] == ' ');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
- test = " \fa quick brown fox jumped over the road \f";
+ REQUIRE(test.c_str()[0] == 'a');
+ REQUIRE(test.c_str()[1] == ' ');
- REQUIRE(test.c_str()[0] == ' ');
- REQUIRE(test.c_str()[1] == '\f');
+ test = " \fa quick brown fox jumped over the road \f";
- test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
+ REQUIRE(test.c_str()[0] == ' ');
+ REQUIRE(test.c_str()[1] == '\f');
- REQUIRE(test.c_str()[0] == 'a');
- REQUIRE(test.c_str()[1] == ' ');
+ test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test);
+ REQUIRE(test.c_str()[0] == 'a');
+ REQUIRE(test.c_str()[1] == ' ');
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 67b5c65..9dbff36 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -77,7 +77,7 @@ class TestRepository : public core::Repository {
std::make_shared<provenance::ProvenanceEventRecord>();
if (eventRead->DeSerialize((uint8_t*) entry.second.data(),
- entry.second.length())) {
+ entry.second.length())) {
records.push_back(eventRead);
}
}
@@ -141,7 +141,7 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
std::make_shared<provenance::ProvenanceEventRecord>();
if (eventRead->DeSerialize((uint8_t*) entry.second.data(),
- entry.second.length())) {
+ entry.second.length())) {
records.push_back(eventRead);
}
}
@@ -154,12 +154,14 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
std::map<std::string, std::string> repositoryResults;
};
-class TestFlowController : public minifi::FlowController{
+class TestFlowController : public minifi::FlowController {
-public:
+ public:
TestFlowController(std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_file_repo)
- : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, "",true) {
+ : minifi::FlowController(repo, flow_file_repo,
+ std::make_shared<minifi::Configure>(), nullptr,
+ "", true) {
}
~TestFlowController() {
@@ -206,10 +208,10 @@ public:
}
std::shared_ptr<minifi::Connection> createConnection(std::string name,
- uuid_t uuid) {
+ uuid_t uuid) {
return 0;
}
-protected:
+ protected:
void initializePaths(const std::string &adjustedFilename) {
}
};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp
index 947932e..f5374b8 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -18,28 +18,28 @@
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "../TestBase.h"
-#include "../unit/ProvenanceTestHelper.h"
-
+#include <utility>
+#include <memory>
+#include <string>
+#include <map>
+#include "ProvenanceTestHelper.h"
#include "provenance/Provenance.h"
#include "FlowFileRecord.h"
#include "core/Core.h"
#include "core/repository/FlowFileRepository.h"
TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
-
provenance::ProvenanceEventRecord record1(
provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah",
"blahblah");
REQUIRE(record1.getAttributes().size() == 0);
REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
-
}
TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-
provenance::ProvenanceEventRecord record1(
- provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid",
- "componenttype");
+ provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
+ "componentid", "componenttype");
std::string eventId = record1.getEventId();
@@ -47,12 +47,13 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
record1.setDetails(smileyface);
uint64_t sample = 65555;
- std::shared_ptr<core::Repository> testRepository =std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> testRepository = std::make_shared<
+ TestRepository>();
record1.setEventDuration(sample);
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository,eventId) == true);
+ REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
REQUIRE(record2.getEventId() == record1.getEventId());
REQUIRE(record2.getComponentId() == record1.getComponentId());
REQUIRE(record2.getComponentType() == record1.getComponentType());
@@ -62,32 +63,33 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
}
TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
-
provenance::ProvenanceEventRecord record1(
- provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid",
- "componenttype");
+ provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE,
+ "componentid", "componenttype");
std::string eventId = record1.getEventId();
std::map<std::string, std::string> attributes;
attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
- std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("./content_repository",0,0,0);
+ std::shared_ptr<core::repository::FlowFileRepository> frepo =
+ std::make_shared<core::repository::FlowFileRepository>(
+ "./content_repository", 0, 0, 0);
std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<
- minifi::FlowFileRecord>(frepo,attributes);
+ minifi::FlowFileRecord>(frepo, attributes);
record1.addChildFlowFile(ffr1);
- uint64_t sample = 65555;
- std::shared_ptr<core::Repository> testRepository =std::make_shared<TestRepository>();
+ uint64_t sample = 65555;
+ std::shared_ptr<core::Repository> testRepository = std::make_shared<
+ TestRepository>();
record1.setEventDuration(sample);
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository,eventId) == true);
+ REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
REQUIRE(record1.getChildrenUuids().size() == 1);
REQUIRE(record2.getChildrenUuids().size() == 1);
std::string childId = record2.getChildrenUuids().at(0);
REQUIRE(childId == ffr1->getUUIDStr());
record2.removeChildUuid(childId);
REQUIRE(record2.getChildrenUuids().size() == 0);
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp
index 83ea49d..4b6c4ad 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -17,8 +17,9 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "../TestBase.h"
-#include "../unit/ProvenanceTestHelper.h"
-
+#include <memory>
+#include <string>
+#include "ProvenanceTestHelper.h"
#include "provenance/Provenance.h"
#include "FlowFileRecord.h"
#include "core/Core.h"
@@ -26,18 +27,11 @@
#include "properties/Configure.h"
TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
-
TestController testController;
-
- //testController.setDebugToConsole();
-
char format[] = "/tmp/testRepo.XXXXXX";
char *dir = testController.createTempDirectory(format);
std::shared_ptr<core::repository::FlowFileRepository> repository =
- std::make_shared<core::repository::FlowFileRepository>(
- dir, 0,
- 0,
- 1);
+ std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
repository->initialize(std::make_shared<minifi::Configure>());
@@ -45,69 +39,44 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
record.addAttribute("keyA", "");
- REQUIRE( true == record.Serialize() );
+ REQUIRE(true == record.Serialize());
repository->stop();
-
testController.setNullAppender();
-
-
}
-
-
TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
-
TestController testController;
-
- //testController.setDebugToConsole();
-
char format[] = "/tmp/testRepo.XXXXXX";
char *dir = testController.createTempDirectory(format);
std::shared_ptr<core::repository::FlowFileRepository> repository =
- std::make_shared<core::repository::FlowFileRepository>(
- dir, 0,
- 0,
- 1);
+ std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
repository->initialize(std::make_shared<minifi::Configure>());
minifi::FlowFileRecord record(repository);
-
-
record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
-
- REQUIRE( true == record.Serialize() );
+ REQUIRE(true == record.Serialize());
repository->stop();
-
testController.setNullAppender();
-
-
}
-
TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
-
TestController testController;
-
- //testController.setDebugToConsole();
-
char format[] = "/tmp/testRepo.XXXXXX";
char *dir = testController.createTempDirectory(format);
std::shared_ptr<core::repository::FlowFileRepository> repository =
- std::make_shared<core::repository::FlowFileRepository>(
- dir, 0,
- 0,
- 1);
+ std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
- repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ repository->initialize(
+ std::make_shared<org::apache::nifi::minifi::Configure>());
minifi::FlowFileRecord record(repository);
@@ -115,7 +84,6 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
std::string uuid = record.getUUIDStr();
-
record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
record.addAttribute("keyB", "");
@@ -126,27 +94,21 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
record.addAttribute("", "sdgsdg");
-
-
-
- REQUIRE( true == record.Serialize() );
+ REQUIRE(true == record.Serialize());
repository->stop();
record2.DeSerialize(uuid);
std::string value;
- REQUIRE(true == record2.getAttribute("",value));
-
- REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd2" == value);
-
- REQUIRE(false == record2.getAttribute("key",value));
- REQUIRE(true == record2.getAttribute("keyA",value));
- REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd" == value);
-
- REQUIRE(true == record2.getAttribute("keyB",value));
- REQUIRE( "" == value);
+ REQUIRE(true == record2.getAttribute("", value));
+ REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd2" == value);
+ REQUIRE(false == record2.getAttribute("key", value));
+ REQUIRE(true == record2.getAttribute("keyA", value));
+ REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd" == value);
+ REQUIRE(true == record2.getAttribute("keyB", value));
+ REQUIRE("" == value);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/SerializationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp
index d9cea0f..1946a92 100644
--- a/libminifi/test/unit/SerializationTests.cpp
+++ b/libminifi/test/unit/SerializationTests.cpp
@@ -30,28 +30,24 @@
#include "../unit/SiteToSiteHelper.h"
#define FMT_DEFAULT fmt_lower
-using namespace org::apache::nifi::minifi::io;
TEST_CASE("TestWriteUTF", "[MINIFI193]") {
+ org::apache::nifi::minifi::io::DataStream baseStream;
- DataStream baseStream;
+ org::apache::nifi::minifi::io::Serializable ser;
- Serializable ser;
-
- std::string stringOne = "helo world"; // yes, this has a typo.
+ std::string stringOne = "helo world"; // yes, this has a typo.
std::string verifyString;
ser.writeUTF(stringOne, &baseStream, false);
ser.readUTF(verifyString, &baseStream, false);
REQUIRE(verifyString == stringOne);
-
}
TEST_CASE("TestWriteUTF2", "[MINIFI193]") {
+ org::apache::nifi::minifi::io::DataStream baseStream;
- DataStream baseStream;
-
- Serializable ser;
+ org::apache::nifi::minifi::io::Serializable ser;
std::string stringOne = "hel\xa1o world";
REQUIRE(11 == stringOne.length());
@@ -61,14 +57,12 @@ TEST_CASE("TestWriteUTF2", "[MINIFI193]") {
ser.readUTF(verifyString, &baseStream, false);
REQUIRE(verifyString == stringOne);
-
}
TEST_CASE("TestWriteUTF3", "[MINIFI193]") {
+ org::apache::nifi::minifi::io::DataStream baseStream;
- DataStream baseStream;
-
- Serializable ser;
+ org::apache::nifi::minifi::io::Serializable ser;
std::string stringOne = "\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c";
REQUIRE(12 == stringOne.length());
@@ -78,6 +72,5 @@ TEST_CASE("TestWriteUTF3", "[MINIFI193]") {
ser.readUTF(verifyString, &baseStream, false);
REQUIRE(verifyString == stringOne);
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/Site2SiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp
index 43afa0e..4ccf012 100644
--- a/libminifi/test/unit/Site2SiteTests.cpp
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -17,28 +17,29 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include <uuid/uuid.h>
+#include <string>
+#include <memory>
+#include <utility>
+#include <map>
#include "io/BaseStream.h"
#include "Site2SitePeer.h"
#include "Site2SiteClientProtocol.h"
-#include <uuid/uuid.h>
#include "core/logging/LogAppenders.h"
#include "core/logging/BaseLogger.h"
#include <algorithm>
-#include <string>
-#include <memory>
-
#include "../TestBase.h"
#include "../unit/SiteToSiteHelper.h"
+
#define FMT_DEFAULT fmt_lower
-using namespace org::apache::nifi::minifi::io;
TEST_CASE("TestSetPortId", "[S2S1]") {
-
- std::unique_ptr<minifi::Site2SitePeer> peer =
- std::unique_ptr < minifi::Site2SitePeer
- > (new minifi::Site2SitePeer(
- std::unique_ptr < DataStream > (new DataStream()), "fake_host",
- 65433));
+ std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<
+ minifi::Site2SitePeer>(
+ new minifi::Site2SitePeer(
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+ new org::apache::nifi::minifi::io::DataStream()),
+ "fake_host", 65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -51,16 +52,15 @@ TEST_CASE("TestSetPortId", "[S2S1]") {
protocol.setPortId(fakeUUID);
REQUIRE(uuid_str == protocol.getPortId());
-
}
TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
-
- std::unique_ptr<minifi::Site2SitePeer> peer =
- std::unique_ptr < minifi::Site2SitePeer
- > (new minifi::Site2SitePeer(
- std::unique_ptr < DataStream > (new DataStream()), "fake_host",
- 65433));
+ std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<
+ minifi::Site2SitePeer>(
+ new minifi::Site2SitePeer(
+ std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+ new org::apache::nifi::minifi::io::DataStream()),
+ "fake_host", 65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -77,12 +77,10 @@ TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), ::tolower);
REQUIRE(uuid_str == protocol.getPortId());
-
}
void sunny_path_bootstrap(SiteToSiteResponder *collector) {
-
- char a = 0x14; // RESOURCE_OK
+ char a = 0x14; // RESOURCE_OK
std::string resp_code;
resp_code.insert(resp_code.begin(), a);
collector->push_response(resp_code);
@@ -104,22 +102,23 @@ void sunny_path_bootstrap(SiteToSiteResponder *collector) {
TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
std::ostringstream oss;
- std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
- logging::BaseLogger>(
- new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
- std::cout, std::make_shared<minifi::Configure>()));
- std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
- logger->updateLogger(std::move(outputLogger));
- logger->setLogLevel("trace");
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
+ new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
+ std::cout, std::make_shared<minifi::Configure>()));
+ std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+ logger->updateLogger(std::move(outputLogger));
+ logger->setLogLevel("trace");
SiteToSiteResponder *collector = new SiteToSiteResponder();
sunny_path_bootstrap(collector);
- std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
- < minifi::Site2SitePeer
- > (new minifi::Site2SitePeer(
- std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
- 65433));
+ std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<
+ minifi::Site2SitePeer>(
+ new minifi::Site2SitePeer(
+ std::unique_ptr<minifi::io::DataStream>(
+ new org::apache::nifi::minifi::io::BaseStream(collector)),
+ "fake_host", 65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -160,7 +159,7 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec");
- collector->get_next_client_response(); // codec version
+ collector->get_next_client_response(); // codec version
// start to send the stuff
// Create the transaction
@@ -170,19 +169,16 @@ TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
transaction = protocol.createTransaction(transactionID, minifi::SEND);
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
- std::map < std::string, std::string > attributes;
+ std::map<std::string, std::string> attributes;
minifi::DataPacket packet(&protocol, transaction, attributes, payload);
REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == true);
collector->get_next_client_response();
collector->get_next_client_response();
std::string rx_payload = collector->get_next_client_response();
- ;
REQUIRE(payload == rx_payload);
-
}
TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
-
SiteToSiteResponder *collector = new SiteToSiteResponder();
char a = 0xFF;
@@ -191,11 +187,12 @@ TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
collector->push_response(resp_code);
collector->push_response(resp_code);
- std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
- < minifi::Site2SitePeer
- > (new minifi::Site2SitePeer(
- std::unique_ptr < minifi::io::DataStream > (new BaseStream(collector)), "fake_host",
- 65433));
+ std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<
+ minifi::Site2SitePeer>(
+ new minifi::Site2SitePeer(
+ std::unique_ptr<minifi::io::DataStream>(
+ new org::apache::nifi::minifi::io::BaseStream(collector)),
+ "fake_host", 65433));
minifi::Site2SiteClientProtocol protocol(std::move(peer));
@@ -208,5 +205,4 @@ TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
protocol.setPortId(fakeUUID);
REQUIRE(false == protocol.bootstrap());
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/SiteToSiteHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index 8c33396..1876bde 100755
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -25,11 +25,11 @@
/**
* Test repository
*/
-class SiteToSiteResponder: public minifi::io::BaseStream {
-private:
+class SiteToSiteResponder : public minifi::io::BaseStream {
+ private:
std::queue<std::string> server_responses_;
std::queue<std::string> client_responses_;
-public:
+ public:
SiteToSiteResponder() {
}
// initialize
@@ -80,7 +80,7 @@ public:
* @return resulting read size
**/
virtual int read(uint16_t &base_value, bool is_little_endian =
- minifi::io::EndiannessCheck::IS_LITTLE) {
+ minifi::io::EndiannessCheck::IS_LITTLE) {
base_value = std::stoi(get_next_response());
return 2;
}
@@ -123,7 +123,7 @@ public:
* @return resulting read size
**/
virtual int read(uint32_t &value, bool is_little_endian =
- minifi::io::EndiannessCheck::IS_LITTLE) {
+ minifi::io::EndiannessCheck::IS_LITTLE) {
value = std::stoul(get_next_response());
return 4;
}
@@ -135,7 +135,7 @@ public:
* @return resulting read size
**/
virtual int read(uint64_t &value, bool is_little_endian =
- minifi::io::EndiannessCheck::IS_LITTLE) {
+ minifi::io::EndiannessCheck::IS_LITTLE) {
value = std::stoull(get_next_response());
return 8;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/SocketTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp
index 157e685..5a53b47 100644
--- a/libminifi/test/unit/SocketTests.cpp
+++ b/libminifi/test/unit/SocketTests.cpp
@@ -19,21 +19,25 @@
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include "../TestBase.h"
+#include <memory>
+#include <vector>
#include "io/ClientSocket.h"
-using namespace org::apache::nifi::minifi::io;
TEST_CASE("TestSocket", "[TestSocket1]") {
-
- Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183);
+ org::apache::nifi::minifi::io::Socket socket(
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>()),
+ "localhost", 8183);
REQUIRE(-1 == socket.initialize());
REQUIRE("localhost" == socket.getHostname());
socket.closeStream();
-
}
TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
-
- Socket socket(std::make_shared<SocketContext>(std::make_shared<minifi::Configure>()), "localhost", 8183);
+ org::apache::nifi::minifi::io::Socket socket(
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>()),
+ "localhost", 8183);
REQUIRE(-1 == socket.initialize());
socket.writeData(0, 0);
@@ -44,21 +48,23 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") {
REQUIRE(-1 == socket.writeData(buffer, 1));
socket.closeStream();
-
}
TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
-
std::vector<uint8_t> buffer;
buffer.push_back('a');
-
- std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
- Socket server(socket_context, "localhost", 9183, 1);
+ std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context =
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>());
+
+ org::apache::nifi::minifi::io::Socket server(socket_context, "localhost",
+ 9183, 1);
REQUIRE(-1 != server.initialize());
- Socket client(socket_context, "localhost", 9183);
+ org::apache::nifi::minifi::io::Socket client(socket_context, "localhost",
+ 9183);
REQUIRE(-1 != client.initialize());
@@ -74,27 +80,27 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") {
server.closeStream();
client.closeStream();
-
}
TEST_CASE("TestGetHostName", "[TestSocket4]") {
-
- REQUIRE(Socket::getMyHostName().length() > 0);
-
+ REQUIRE(org::apache::nifi::minifi::io::Socket::getMyHostName().length() > 0);
}
TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
-
std::vector<uint8_t> buffer;
buffer.push_back('a');
-
- std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
- Socket server(socket_context, "localhost", 9183, 1);
+ std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context =
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>());
+
+ org::apache::nifi::minifi::io::Socket server(socket_context, "localhost",
+ 9183, 1);
REQUIRE(-1 != server.initialize());
- Socket client(socket_context, "localhost", 9183);
+ org::apache::nifi::minifi::io::Socket client(socket_context, "localhost",
+ 9183);
REQUIRE(-1 != client.initialize());
@@ -109,24 +115,25 @@ TEST_CASE("TestWriteEndian64", "[TestSocket4]") {
server.closeStream();
client.closeStream();
-
}
TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
-
std::vector<uint8_t> buffer;
buffer.push_back('a');
- std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-
- Socket server(socket_context, "localhost", 9183, 1);
+ std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context =
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>());
+
+ org::apache::nifi::minifi::io::Socket server(socket_context, "localhost",
+ 9183, 1);
REQUIRE(-1 != server.initialize());
- Socket client(socket_context, "localhost", 9183);
+ org::apache::nifi::minifi::io::Socket client(socket_context, "localhost",
+ 9183);
REQUIRE(-1 != client.initialize());
-
{
uint32_t negative_one = -1;
REQUIRE(4 == client.write(negative_one));
@@ -136,7 +143,6 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
REQUIRE(negative_two == negative_one);
}
-
{
uint16_t negative_one = -1;
REQUIRE(2 == client.write(negative_one));
@@ -149,21 +155,23 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") {
server.closeStream();
client.closeStream();
-
}
TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") {
-
std::vector<uint8_t> buffer;
buffer.push_back('a');
- std::shared_ptr<SocketContext> socket_context = std::make_shared<SocketContext>(std::make_shared<minifi::Configure>());
-
- Socket server(socket_context, "localhost", 9183, 1);
+ std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context =
+ std::make_shared<org::apache::nifi::minifi::io::SocketContext>(
+ std::make_shared<minifi::Configure>());
+
+ org::apache::nifi::minifi::io::Socket server(socket_context, "localhost",
+ 9183, 1);
REQUIRE(-1 != server.initialize());
- Socket client(socket_context, "localhost", 9183);
+ org::apache::nifi::minifi::io::Socket client(socket_context, "localhost",
+ 9183);
REQUIRE(-1 != client.initialize());
@@ -181,5 +189,4 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") {
REQUIRE(-1 == client.writeData(buffer, 1));
server.closeStream();
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/Tests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Tests.cpp b/libminifi/test/unit/Tests.cpp
index a826f3a..291fc2f 100644
--- a/libminifi/test/unit/Tests.cpp
+++ b/libminifi/test/unit/Tests.cpp
@@ -18,11 +18,9 @@
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
-
-
#include "utils/TimeUtil.h"
#include "../TestBase.h"
-TEST_CASE("Test time conversion", "[testtimeconversion]"){
- REQUIRE ( "2017-02-16 20:14:56.196" == getTimeStr(1487276096196,true) );
+TEST_CASE("Test time conversion", "[testtimeconversion]") {
+ REQUIRE("2017-02-16 20:14:56.196" == getTimeStr(1487276096196, true));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index cd34293..0fa75e6 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -17,6 +17,7 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include <utility>
#include <future>
#include "../TestBase.h"
#include "utils/ThreadPool.h"
@@ -31,9 +32,6 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") {
utils::Worker<bool> functor(f_ex);
pool.start();
std::future<bool> fut = pool.execute(std::move(functor));
-
fut.wait();
-
REQUIRE(true == fut.get());
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/YamlConfigurationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/YamlConfigurationTests.cpp b/libminifi/test/unit/YamlConfigurationTests.cpp
index f958ca1..a7ed5df 100644
--- a/libminifi/test/unit/YamlConfigurationTests.cpp
+++ b/libminifi/test/unit/YamlConfigurationTests.cpp
@@ -17,7 +17,7 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
-
+#include <map>
#include <memory>
#include <string>
#include <core/RepositoryFactory.h>
@@ -25,169 +25,168 @@
#include "../TestBase.h"
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
-
- std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository", true);
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<minifi::io::StreamFactory>(configuration);
- core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(testProvRepo, testFlowFileRepo, streamFactory, configuration);
+ std::shared_ptr<core::Repository> testProvRepo = core::createRepository(
+ "provenancerepository", true);
+ std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository(
+ "flowfilerepository", true);
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<
+ minifi::Configure>();
+ std::shared_ptr<minifi::io::StreamFactory> streamFactory = std::make_shared<
+ minifi::io::StreamFactory>(configuration);
+ core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(
+ testProvRepo, testFlowFileRepo, streamFactory, configuration);
SECTION("loading YAML without optional component IDs works") {
+ static const std::string CONFIG_YAML_WITHOUT_IDS = ""
+ "MiNiFi Config Version: 1\n"
+ "Flow Controller:\n"
+ " name: MiNiFi Flow\n"
+ " comment:\n"
+ "\n"
+ "Core Properties:\n"
+ " flow controller graceful shutdown period: 10 sec\n"
+ " flow service write delay interval: 500 ms\n"
+ " administrative yield duration: 30 sec\n"
+ " bored yield duration: 10 millis\n"
+ "\n"
+ "FlowFile Repository:\n"
+ " partitions: 256\n"
+ " checkpoint interval: 2 mins\n"
+ " always sync: false\n"
+ " Swap:\n"
+ " threshold: 20000\n"
+ " in period: 5 sec\n"
+ " in threads: 1\n"
+ " out period: 5 sec\n"
+ " out threads: 4\n"
+ "\n"
+ "Provenance Repository:\n"
+ " provenance rollover time: 1 min\n"
+ "\n"
+ "Content Repository:\n"
+ " content claim max appendable size: 10 MB\n"
+ " content claim max flow files: 100\n"
+ " always sync: false\n"
+ "\n"
+ "Component Status Repository:\n"
+ " buffer size: 1440\n"
+ " snapshot frequency: 1 min\n"
+ "\n"
+ "Security Properties:\n"
+ " keystore: /tmp/ssl/localhost-ks.jks\n"
+ " keystore type: JKS\n"
+ " keystore password: localtest\n"
+ " key password: localtest\n"
+ " truststore: /tmp/ssl/localhost-ts.jks\n"
+ " truststore type: JKS\n"
+ " truststore password: localtest\n"
+ " ssl protocol: TLS\n"
+ " Sensitive Props:\n"
+ " key:\n"
+ " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
+ " provider: BC\n"
+ "\n"
+ "Processors:\n"
+ " - name: TailFile\n"
+ " class: org.apache.nifi.processors.standard.TailFile\n"
+ " max concurrent tasks: 1\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 1 sec\n"
+ " penalization period: 30 sec\n"
+ " yield period: 1 sec\n"
+ " run duration nanos: 0\n"
+ " auto-terminated relationships list:\n"
+ " Properties:\n"
+ " File to Tail: logs/minifi-app.log\n"
+ " Rolling Filename Pattern: minifi-app*\n"
+ " Initial Start Position: Beginning of File\n"
+ "\n"
+ "Connections:\n"
+ " - name: TailToS2S\n"
+ " source name: TailFile\n"
+ " source relationship name: success\n"
+ " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " max work queue size: 0\n"
+ " max work queue data size: 1 MB\n"
+ " flowfile expiration: 60 sec\n"
+ " queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
+ "\n"
+ "Remote Processing Groups:\n"
+ " - name: NiFi Flow\n"
+ " comment:\n"
+ " url: https://localhost:8090/nifi\n"
+ " timeout: 30 secs\n"
+ " yield period: 10 sec\n"
+ " Input Ports:\n"
+ " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " name: tailed log\n"
+ " comments:\n"
+ " max concurrent tasks: 1\n"
+ " use compression: false\n"
+ "\n"
+ "Provenance Reporting:\n"
+ " comment:\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 30 sec\n"
+ " host: localhost\n"
+ " port name: provenance\n"
+ " port: 8090\n"
+ " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
+ " destination url: https://localhost:8090/\n"
+ " originating url: http://${hostname(true)}:8081/nifi\n"
+ " use compression: true\n"
+ " timeout: 30 secs\n"
+ " batch size: 1000";
- static const std::string CONFIG_YAML_WITHOUT_IDS = ""
- "MiNiFi Config Version: 1\n"
- "Flow Controller:\n"
- " name: MiNiFi Flow\n"
- " comment:\n"
- "\n"
- "Core Properties:\n"
- " flow controller graceful shutdown period: 10 sec\n"
- " flow service write delay interval: 500 ms\n"
- " administrative yield duration: 30 sec\n"
- " bored yield duration: 10 millis\n"
- "\n"
- "FlowFile Repository:\n"
- " partitions: 256\n"
- " checkpoint interval: 2 mins\n"
- " always sync: false\n"
- " Swap:\n"
- " threshold: 20000\n"
- " in period: 5 sec\n"
- " in threads: 1\n"
- " out period: 5 sec\n"
- " out threads: 4\n"
- "\n"
- "Provenance Repository:\n"
- " provenance rollover time: 1 min\n"
- "\n"
- "Content Repository:\n"
- " content claim max appendable size: 10 MB\n"
- " content claim max flow files: 100\n"
- " always sync: false\n"
- "\n"
- "Component Status Repository:\n"
- " buffer size: 1440\n"
- " snapshot frequency: 1 min\n"
- "\n"
- "Security Properties:\n"
- " keystore: /tmp/ssl/localhost-ks.jks\n"
- " keystore type: JKS\n"
- " keystore password: localtest\n"
- " key password: localtest\n"
- " truststore: /tmp/ssl/localhost-ts.jks\n"
- " truststore type: JKS\n"
- " truststore password: localtest\n"
- " ssl protocol: TLS\n"
- " Sensitive Props:\n"
- " key:\n"
- " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
- " provider: BC\n"
- "\n"
- "Processors:\n"
- " - name: TailFile\n"
- " class: org.apache.nifi.processors.standard.TailFile\n"
- " max concurrent tasks: 1\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 1 sec\n"
- " penalization period: 30 sec\n"
- " yield period: 1 sec\n"
- " run duration nanos: 0\n"
- " auto-terminated relationships list:\n"
- " Properties:\n"
- " File to Tail: logs/minifi-app.log\n"
- " Rolling Filename Pattern: minifi-app*\n"
- " Initial Start Position: Beginning of File\n"
- "\n"
- "Connections:\n"
- " - name: TailToS2S\n"
- " source name: TailFile\n"
- " source relationship name: success\n"
- " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " max work queue size: 0\n"
- " max work queue data size: 1 MB\n"
- " flowfile expiration: 60 sec\n"
- " queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
- "\n"
- "Remote Processing Groups:\n"
- " - name: NiFi Flow\n"
- " comment:\n"
- " url: https://localhost:8090/nifi\n"
- " timeout: 30 secs\n"
- " yield period: 10 sec\n"
- " Input Ports:\n"
- " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " name: tailed log\n"
- " comments:\n"
- " max concurrent tasks: 1\n"
- " use compression: false\n"
- "\n"
- "Provenance Reporting:\n"
- " comment:\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 30 sec\n"
- " host: localhost\n"
- " port name: provenance\n"
- " port: 8090\n"
- " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
- " destination url: https://localhost:8090/\n"
- " originating url: http://${hostname(true)}:8081/nifi\n"
- " use compression: true\n"
- " timeout: 30 secs\n"
- " batch size: 1000";
+ std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
+ std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(configYamlStream);
- std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
- std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(configYamlStream);
+ REQUIRE(rootFlowConfig);
+ REQUIRE(rootFlowConfig->findProcessor("TailFile"));
+ REQUIRE(NULL != rootFlowConfig->findProcessor("TailFile")->getUUID());
+ REQUIRE(!rootFlowConfig->findProcessor("TailFile")->getUUIDStr().empty());
+ REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+ REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessor("TailFile")->getSchedulingStrategy());
+ REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
+ REQUIRE(1*1000*1000*1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
+ REQUIRE(30*1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
+ REQUIRE(1*1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
+ REQUIRE(0 == rootFlowConfig->findProcessor("TailFile")->getRunDurationNano());
- REQUIRE(rootFlowConfig);
- REQUIRE(rootFlowConfig->findProcessor("TailFile"));
- REQUIRE(NULL != rootFlowConfig->findProcessor("TailFile")->getUUID());
- REQUIRE(!rootFlowConfig->findProcessor("TailFile")->getUUIDStr().empty());
- REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
- REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessor("TailFile")->getSchedulingStrategy());
- REQUIRE(1 == rootFlowConfig->findProcessor("TailFile")->getMaxConcurrentTasks());
- REQUIRE(1*1000*1000*1000 == rootFlowConfig->findProcessor("TailFile")->getSchedulingPeriodNano());
- REQUIRE(30*1000 == rootFlowConfig->findProcessor("TailFile")->getPenalizationPeriodMsec());
- REQUIRE(1*1000 == rootFlowConfig->findProcessor("TailFile")->getYieldPeriodMsec());
- REQUIRE(0 == rootFlowConfig->findProcessor("TailFile")->getRunDurationNano());
-
- std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
- rootFlowConfig->getConnections(connectionMap);
- REQUIRE(1 == connectionMap.size());
- // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it
- for(
- std::map<std::string,std::shared_ptr<minifi::Connection>>::iterator it = connectionMap.begin();
- it != connectionMap.end();
- ++it) {
- REQUIRE(it->second);
- REQUIRE(!it->second->getUUIDStr().empty());
- REQUIRE(it->second->getDestination());
- REQUIRE(it->second->getSource());
- }
+ std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+ rootFlowConfig->getConnections(connectionMap);
+ REQUIRE(1 == connectionMap.size());
+ // This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it
+ for (auto it : connectionMap) {
+ REQUIRE(it.second);
+ REQUIRE(!it.second->getUUIDStr().empty());
+ REQUIRE(it.second->getDestination());
+ REQUIRE(it.second->getSource());
}
+}
SECTION("missing required field in YAML throws exception") {
+ static const std::string CONFIG_YAML_NO_RPG_PORT_ID = ""
+ "MiNiFi Config Version: 1\n"
+ "Flow Controller:\n"
+ " name: MiNiFi Flow\n"
+ "Processors: []\n"
+ "Connections: []\n"
+ "Remote Processing Groups:\n"
+ " - name: NiFi Flow\n"
+ " comment:\n"
+ " url: https://localhost:8090/nifi\n"
+ " timeout: 30 secs\n"
+ " yield period: 10 sec\n"
+ " Input Ports:\n"
+ " - name: tailed log\n"
+ " comments:\n"
+ " max concurrent tasks: 1\n"
+ " use compression: false\n"
+ "\n";
- static const std::string CONFIG_YAML_NO_RPG_PORT_ID = ""
- "MiNiFi Config Version: 1\n"
- "Flow Controller:\n"
- " name: MiNiFi Flow\n"
- "Processors: []\n"
- "Connections: []\n"
- "Remote Processing Groups:\n"
- " - name: NiFi Flow\n"
- " comment:\n"
- " url: https://localhost:8090/nifi\n"
- " timeout: 30 secs\n"
- " yield period: 10 sec\n"
- " Input Ports:\n"
- " - name: tailed log\n"
- " comments:\n"
- " max concurrent tasks: 1\n"
- " use compression: false\n"
- "\n";
-
- std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID);
- REQUIRE_THROWS_AS(yamlConfig->getRoot(configYamlStream), std::invalid_argument);
- }
+ std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID);
+ REQUIRE_THROWS_AS(yamlConfig->getRoot(configYamlStream), std::invalid_argument);
+}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/thirdparty/google-styleguide/run_linter.sh
----------------------------------------------------------------------
diff --git a/thirdparty/google-styleguide/run_linter.sh b/thirdparty/google-styleguide/run_linter.sh
index fbf8730..102145f 100755
--- a/thirdparty/google-styleguide/run_linter.sh
+++ b/thirdparty/google-styleguide/run_linter.sh
@@ -26,4 +26,4 @@ HEADERS=`find ${1} -name '*.h' | tr '\n' ','`
SOURCES=`find ${2} -name '*.cpp' | tr '\n' ' '`
echo ${HEADERS}
echo ${SOURCES}
-python ${SCRIPT_DIR}/cpplint.py --linelength=128 --headers=${HEADERS} ${SOURCES}
+python ${SCRIPT_DIR}/cpplint.py --linelength=200 --headers=${HEADERS} ${SOURCES}
[2/2] nifi-minifi-cpp git commit: MINIFI-289: Update test folder to
apply linter and set max characters to 200
Posted by al...@apache.org.
MINIFI-289: Update test folder to apply linter and set max characters to 200
This closes #97.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/63c53bcf
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/63c53bcf
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/63c53bcf
Branch: refs/heads/master
Commit: 63c53bcfd59b94f42bdb01bd7e8154c0aceafca3
Parents: bc0d65e
Author: Marc Parisi <ph...@apache.org>
Authored: Tue May 16 13:58:28 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri May 19 13:31:11 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 6 +
cmake/BuildTests.cmake | 2 +-
cmake/DockerConfig.cmake | 4 -
libminifi/src/io/CRCStream.cpp | 3 +-
libminifi/test/CPPLINT.cfg | 3 +
libminifi/test/Server.cpp | 919 +++++++++----------
libminifi/test/TestBase.h | 6 +-
.../ControllerServiceIntegrationTests.cpp | 5 +-
.../test/integration/HttpGetIntegrationTest.cpp | 20 +-
.../integration/HttpPostIntegrationTest.cpp | 8 +-
.../test/integration/TestExecuteProcess.cpp | 39 +-
libminifi/test/nodefs/NoLevelDB.cpp | 2 +-
libminifi/test/nodefs/NoYamlConfiguration.cpp | 11 +-
libminifi/test/unit/CRCTests.cpp | 13 +-
libminifi/test/unit/ClassLoaderTests.cpp | 24 +-
libminifi/test/unit/ControllerServiceTests.cpp | 9 +-
libminifi/test/unit/InvokeHTTPTests.cpp | 14 +-
libminifi/test/unit/LoggerTests.cpp | 149 ++-
libminifi/test/unit/ProcessorTests.cpp | 33 +-
libminifi/test/unit/PropertyTests.cpp | 105 +--
libminifi/test/unit/ProvenanceTestHelper.h | 16 +-
libminifi/test/unit/ProvenanceTests.cpp | 38 +-
libminifi/test/unit/RepoTests.cpp | 74 +-
libminifi/test/unit/SerializationTests.cpp | 21 +-
libminifi/test/unit/Site2SiteTests.cpp | 84 +-
libminifi/test/unit/SiteToSiteHelper.h | 12 +-
libminifi/test/unit/SocketTests.cpp | 79 +-
libminifi/test/unit/Tests.cpp | 6 +-
libminifi/test/unit/ThreadPoolTests.cpp | 4 +-
libminifi/test/unit/YamlConfigurationTests.cpp | 311 ++++---
thirdparty/google-styleguide/run_linter.sh | 2 +-
31 files changed, 940 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d392512..83b867a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -140,6 +140,7 @@ set(CPACK_BINARY_TGZ, "ON")
set(CPACK_ARCHIVE_COMPONENT_INSTALL ON)
set(CPACK_COMPONENTS_ALL bin)
+
### include modules
include(CPack)
@@ -150,3 +151,8 @@ include(BuildDocs)
include(DockerConfig)
+# Create a custom build target that will run the linter.
+add_custom_target(
+ linter
+ COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/google-styleguide/run_linter.sh ${CMAKE_SOURCE_DIR}/libminifi/include/ ${CMAKE_SOURCE_DIR}/libminifi/src/
+ COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/google-styleguide/run_linter.sh ${CMAKE_SOURCE_DIR}/libminifi/include/ ${CMAKE_SOURCE_DIR}/libminifi/test/ )
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 0c96842..67ffbea 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -30,7 +30,7 @@ MACRO(GETSOURCEFILES result curdir)
ENDMACRO()
function(createTests testName)
- message ("File name is ${testName}")
+ message ("-- Adding test: ${testName}")
target_include_directories(${testName} PRIVATE BEFORE ${UUID_INCLUDE_DIRS})
target_include_directories(${testName} PRIVATE BEFORE "thirdparty/catch")
target_include_directories(${testName} PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/cmake/DockerConfig.cmake
----------------------------------------------------------------------
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 78a1140..41ca7f7 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -21,8 +21,4 @@ add_custom_target(
COMMAND ${CMAKE_SOURCE_DIR}/docker/DockerBuild.sh 1000 1000 ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH} minificppsource ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/docker/)
-# Create a custom build target that will run the linter.
-add_custom_target(
- linter
- COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/google-styleguide/run_linter.sh ${CMAKE_SOURCE_DIR}/libminifi/include/ ${CMAKE_SOURCE_DIR}/libminifi/src/ )
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/src/io/CRCStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/CRCStream.cpp b/libminifi/src/io/CRCStream.cpp
index e06a8f5..1322dc6 100644
--- a/libminifi/src/io/CRCStream.cpp
+++ b/libminifi/src/io/CRCStream.cpp
@@ -16,7 +16,6 @@
* limitations under the License.
*/
+#include "io/CRCStream.h"
#include <zlib.h>
#include <memory>
-#include "io/CRCStream.h"
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/CPPLINT.cfg
----------------------------------------------------------------------
diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg
new file mode 100644
index 0000000..beed48a
--- /dev/null
+++ b/libminifi/test/CPPLINT.cfg
@@ -0,0 +1,3 @@
+set noparent
+filter=-build/include_order,-build/include_alpha
+exclude_files=Server.cpp
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/Server.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp
index 9428ee0..875e7a9 100644
--- a/libminifi/test/Server.cpp
+++ b/libminifi/test/Server.cpp
@@ -1,5 +1,5 @@
/* A simple server in the internet domain using TCP
- The port number is passed as an argument */
+ The port number is passed as an argument */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -24,583 +24,516 @@
// FlowControl Protocol Msg Type
typedef enum {
- REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version
- REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.yml from server ask device to apply and also device report interval
- REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info
- REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property
- MAX_FLOW_CONTROL_MSG_TYPE
+ REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version
+ REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.yml from server ask device to apply and also device report interval
+ REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow YAML name/version and other period report info
+ REPORT_RESP, // Report Respond from server to device, may ask device to update flow YAML or processor property
+ MAX_FLOW_CONTROL_MSG_TYPE
} FlowControlMsgType;
// FlowControl Protocol Msg Type String
-static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
-{
- "REGISTER_REQ",
- "REGISTER_RESP",
- "REPORT_REQ",
- "REPORT_RESP"
-};
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = {
+ "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
// Flow Control Msg Type to String
-inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
-{
- if (type < MAX_FLOW_CONTROL_MSG_TYPE)
- return FlowControlMsgTypeStr[type];
- else
- return NULL;
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) {
+ if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+ return FlowControlMsgTypeStr[type];
+ else
+ return NULL;
}
// FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
typedef enum {
- //Fix length 8 bytes: client to server in register request, required field
- FLOW_SERIAL_NUMBER,
- // Flow YAML name TLV: client to server in register request and report request, required field
- FLOW_YAML_NAME,
- // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server
- FLOW_YAML_CONTENT,
- // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
- REPORT_INTERVAL,
- // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
- PROCESSOR_NAME,
- // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
- PROPERTY_NAME,
- // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
- PROPERTY_VALUE,
- // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
- REPORT_BLOB,
- MAX_FLOW_MSG_ID
+ //Fix length 8 bytes: client to server in register request, required field
+ FLOW_SERIAL_NUMBER,
+ // Flow YAML name TLV: client to server in register request and report request, required field
+ FLOW_YAML_NAME,
+ // Flow YAML content, TLV: server to client in register respond, option field in case server want to ask client to load YAML from server
+ FLOW_YAML_CONTENT,
+ // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
+ REPORT_INTERVAL,
+ // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROCESSOR_NAME,
+ // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_NAME,
+ // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_VALUE,
+ // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
+ REPORT_BLOB,
+ MAX_FLOW_MSG_ID
} FlowControlMsgID;
// FlowControl Protocol Msg ID String
-static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
-{
- "FLOW_SERIAL_NUMBER",
- "FLOW_YAML_NAME",
- "FLOW_YAML_CONTENT",
- "REPORT_INTERVAL",
- "PROCESSOR_NAME"
- "PROPERTY_NAME",
- "PROPERTY_VALUE",
- "REPORT_BLOB"
-};
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = {
+ "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT",
+ "REPORT_INTERVAL", "PROCESSOR_NAME"
+ "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
// FlowControl Protocol Msg Len
-inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
-{
- if (id == FLOW_SERIAL_NUMBER)
- return (TYPE_HDR_LEN + 8);
- else if (id == REPORT_INTERVAL)
- return (TYPE_HDR_LEN + 4);
- else if (id < MAX_FLOW_MSG_ID)
- return (TLV_HDR_LEN + payLoadLen);
- else
- return -1;
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) {
+ if (id == FLOW_SERIAL_NUMBER)
+ return (TYPE_HDR_LEN + 8);
+ else if (id == REPORT_INTERVAL)
+ return (TYPE_HDR_LEN + 4);
+ else if (id < MAX_FLOW_MSG_ID)
+ return (TLV_HDR_LEN + payLoadLen);
+ else
+ return -1;
}
// Flow Control Msg Id to String
-inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
-{
- if (id < MAX_FLOW_MSG_ID)
- return FlowControlMsgIDStr[id];
- else
- return NULL;
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) {
+ if (id < MAX_FLOW_MSG_ID)
+ return FlowControlMsgIDStr[id];
+ else
+ return NULL;
}
// Flow Control Respond status code
typedef enum {
- RESP_SUCCESS,
- RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
- RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
- RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
- RESP_FAILURE,
- MAX_RESP_CODE
+ RESP_SUCCESS,
+ RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
+ RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
+ RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
+ RESP_FAILURE,
+ MAX_RESP_CODE
} FlowControlRespCode;
// FlowControl Resp Code str
-static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
-{
- "RESP_SUCCESS",
- "RESP_TRIGGER_REGISTER",
- "RESP_START_FLOW_CONTROLLER",
- "RESP_STOP_FLOW_CONTROLLER",
- "RESP_FAILURE"
-};
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS",
+ "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER",
+ "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
// Flow Control Resp Code to String
-inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
-{
- if (code < MAX_RESP_CODE)
- return FlowControlRespCodeStr[code];
- else
- return NULL;
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) {
+ if (code < MAX_RESP_CODE)
+ return FlowControlRespCodeStr[code];
+ else
+ return NULL;
}
// Common FlowControlProtocol Header
typedef struct {
- uint32_t msgType; // Msg Type
- uint32_t seqNumber; // Seq Number to match Req with Resp
- uint32_t status; // Resp Code, see FlowControlRespCode
- uint32_t payloadLen; // Msg Payload length
+ uint32_t msgType; // Msg Type
+ uint32_t seqNumber; // Seq Number to match Req with Resp
+ uint32_t status; // Resp Code, see FlowControlRespCode
+ uint32_t payloadLen; // Msg Payload length
} FlowControlProtocolHeader;
-
// encode uint32_t
-uint8_t *encode(uint8_t *buf, uint32_t value)
-{
- *buf++ = (value & 0xFF000000) >> 24;
- *buf++ = (value & 0x00FF0000) >> 16;
- *buf++ = (value & 0x0000FF00) >> 8;
- *buf++ = (value & 0x000000FF);
- return buf;
+uint8_t *encode(uint8_t *buf, uint32_t value) {
+ *buf++ = (value & 0xFF000000) >> 24;
+ *buf++ = (value & 0x00FF0000) >> 16;
+ *buf++ = (value & 0x0000FF00) >> 8;
+ *buf++ = (value & 0x000000FF);
+ return buf;
}
// encode uint32_t
-uint8_t *decode(uint8_t *buf, uint32_t &value)
-{
- value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
- return (buf + 4);
+uint8_t *decode(uint8_t *buf, uint32_t &value) {
+ value = ((buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | (buf[3]));
+ return (buf + 4);
}
// encode byte array
-uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
-{
- memcpy(buf, bufArray, size);
- buf += size;
- return buf;
+uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) {
+ memcpy(buf, bufArray, size);
+ buf += size;
+ return buf;
}
// encode std::string
-uint8_t *encode(uint8_t *buf, std::string value)
-{
- // add the \0 for size
- buf = encode(buf, value.size()+1);
- buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
- return buf;
+uint8_t *encode(uint8_t *buf, std::string value) {
+ // add the \0 for size
+ buf = encode(buf, value.size() + 1);
+ buf = encode(buf, (uint8_t *) value.c_str(), value.size() + 1);
+ return buf;
}
-int sendData(int socket, uint8_t *buf, int buflen)
-{
- int ret = 0, bytes = 0;
-
- while (bytes < buflen)
- {
- ret = send(socket, buf+bytes, buflen-bytes, 0);
- //check for errors
- if (ret == -1)
- {
- return ret;
- }
- bytes+=ret;
- }
-
- return bytes;
+int sendData(int socket, uint8_t *buf, int buflen) {
+ int ret = 0, bytes = 0;
+
+ while (bytes < buflen) {
+ ret = send(socket, buf + bytes, buflen - bytes, 0);
+ //check for errors
+ if (ret == -1) {
+ return ret;
+ }
+ bytes += ret;
+ }
+
+ return bytes;
}
-void error(const char *msg)
-{
- perror(msg);
- exit(1);
+void error(const char *msg) {
+ perror(msg);
+ exit(1);
}
/* readline - read a '\n' terminated line from socket fd
- into buffer bufptr of size len. The line in the
- buffer is terminated with '\0'.
- It returns -1 in case of error or if
- the capacity of the buffer is exceeded.
- It returns 0 if EOF is encountered before reading '\n'.
+ into buffer bufptr of size len. The line in the
+ buffer is terminated with '\0'.
+ It returns -1 in case of error or if
+ the capacity of the buffer is exceeded.
+ It returns 0 if EOF is encountered before reading '\n'.
*/
-int readline( int fd, char *bufptr, size_t len )
-{
+int readline(int fd, char *bufptr, size_t len) {
/* Note that this function is very tricky. It uses the
- static variables bp, cnt, and b to establish a local buffer.
- The recv call requests large chunks of data (the size of the buffer).
- Then if the recv call reads more than one line, the overflow
- remains in the buffer and it is made available to the next call
- to readline.
- Notice also that this routine reads up to '\n' and overwrites
- it with '\0'. Thus if the line is really terminated with
- "\r\n", the '\r' will remain unchanged.
- */
+ static variables bp, cnt, and b to establish a local buffer.
+ The recv call requests large chunks of data (the size of the buffer).
+ Then if the recv call reads more than one line, the overflow
+ remains in the buffer and it is made available to the next call
+ to readline.
+ Notice also that this routine reads up to '\n' and overwrites
+ it with '\0'. Thus if the line is really terminated with
+ "\r\n", the '\r' will remain unchanged.
+ */
char *bufx = bufptr;
static char *bp;
static int cnt = 0;
- static char b[ 4096 ];
+ static char b[4096];
char c;
-
- while ( --len > 0 )
- {
- if ( --cnt <= 0 )
- {
- cnt = recv( fd, b, sizeof( b ), 0 );
- if ( cnt < 0 )
- {
- if ( errno == EINTR )
- {
- len++; /* the while will decrement */
- continue;
- }
- return -1;
- }
- if ( cnt == 0 )
- return 0;
- bp = b;
- }
- c = *bp++;
- *bufptr++ = c;
- if ( c == '\n' )
- {
- *bufptr = '\0';
- return bufptr - bufx;
- }
+
+ while (--len > 0) {
+ if (--cnt <= 0) {
+ cnt = recv(fd, b, sizeof(b), 0);
+ if (cnt < 0) {
+ if ( errno == EINTR) {
+ len++; /* the while will decrement */
+ continue;
+ }
+ return -1;
+ }
+ if (cnt == 0)
+ return 0;
+ bp = b;
+ }
+ c = *bp++;
+ *bufptr++ = c;
+ if (c == '\n') {
+ *bufptr = '\0';
+ return bufptr - bufx;
}
+ }
return -1;
}
-int readData(int socket, uint8_t *buf, int buflen)
-{
- int sendSize = buflen;
- int status;
+int readData(int socket, uint8_t *buf, int buflen) {
+ int sendSize = buflen;
+ int status;
- while (buflen)
- {
+ while (buflen) {
#ifndef __MACH__
- status = read(socket, buf, buflen);
+ status = read(socket, buf, buflen);
#else
- status = recv(socket, buf, buflen, 0);
+ status = recv(socket, buf, buflen, 0);
#endif
- if (status <= 0)
- {
- return status;
- }
- buflen -= status;
- buf += status;
- }
-
- return sendSize;
+ if (status <= 0) {
+ return status;
+ }
+ buflen -= status;
+ buf += status;
+ }
+
+ return sendSize;
}
-int readHdr(int socket, FlowControlProtocolHeader *hdr)
-{
- uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+int readHdr(int socket, FlowControlProtocolHeader *hdr) {
+ uint8_t buffer[sizeof(FlowControlProtocolHeader)];
- uint8_t *data = buffer;
+ uint8_t *data = buffer;
- int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader));
- if (status <= 0)
- return status;
+ int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader));
+ if (status <= 0)
+ return status;
- uint32_t value;
- data = decode(data, value);
- hdr->msgType = value;
+ uint32_t value;
+ data = decode(data, value);
+ hdr->msgType = value;
- data = decode(data, value);
- hdr->seqNumber = value;
+ data = decode(data, value);
+ hdr->seqNumber = value;
- data = decode(data, value);
- hdr->status = value;
+ data = decode(data, value);
+ hdr->status = value;
- data = decode(data, value);
- hdr->payloadLen = value;
+ data = decode(data, value);
+ hdr->payloadLen = value;
- return sizeof(FlowControlProtocolHeader);
+ return sizeof(FlowControlProtocolHeader);
}
-int readYAML(char **ymlContent)
-{
- std::ifstream is ("conf/flowServer.yml", std::ifstream::binary);
- if (is) {
- // get length of file:
- is.seekg (0, is.end);
- int length = is.tellg();
- is.seekg (0, is.beg);
+int readYAML(char **ymlContent) {
+ std::ifstream is("conf/flowServer.yml", std::ifstream::binary);
+ if (is) {
+ // get length of file:
+ is.seekg(0, is.end);
+ int length = is.tellg();
+ is.seekg(0, is.beg);
- char * buffer = new char [length];
+ char * buffer = new char[length];
- printf("Reading %s len %d\n", "conf/flowServer.yml", length);
- // read data as a block:
- is.read (buffer,length);
+ printf("Reading %s len %d\n", "conf/flowServer.yml", length);
+ // read data as a block:
+ is.read(buffer, length);
- is.close();
+ is.close();
- // ...buffer contains the entire file...
- *ymlContent = buffer;
+ // ...buffer contains the entire file...
+ *ymlContent = buffer;
- return length;
- }
- return 0;
+ return length;
+ }
+ return 0;
}
static int sockfd = 0, newsockfd = 0;
-void sigHandler(int signal)
-{
- if (signal == SIGINT || signal == SIGTERM)
- {
- close(newsockfd);
- close(sockfd);
- exit(1);
- }
+void sigHandler(int signal) {
+ if (signal == SIGINT || signal == SIGTERM) {
+ close(newsockfd);
+ close(sockfd);
+ exit(1);
+ }
}
-int main(int argc, char *argv[])
-{
- int portno;
- socklen_t clilen;
- struct sockaddr_in serv_addr, cli_addr;
- char buffer[4096];
- int flag = 0;
- int number = 0;
-
- int n;
- if (argc < 2) {
- fprintf(stderr,"ERROR, no port provided\n");
- exit(1);
- }
-
- if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR)
- {
-
- return -1;
- }
- sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (sockfd < 0)
- error("ERROR opening socket");
- bzero((char *) &serv_addr, sizeof(serv_addr));
- portno = atoi(argv[1]);
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY;
- serv_addr.sin_port = htons(portno);
- if (bind(sockfd, (struct sockaddr *) &serv_addr,
- sizeof(serv_addr)) < 0)
- error("ERROR on binding");
- listen(sockfd,5);
- if (portno == DEFAULT_NIFI_SERVER_PORT)
- {
- while (true)
- {
- clilen = sizeof(cli_addr);
- newsockfd = accept(sockfd,
- (struct sockaddr *) &cli_addr,
- &clilen);
- if (newsockfd < 0)
- {
- error("ERROR on accept");
- break;
- }
- // process request
- FlowControlProtocolHeader hdr;
- int status = readHdr(newsockfd, &hdr);
- if (status > 0)
- {
- printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
- printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
- printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
- if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
- {
- printf("Flow Control Protocol Register Req receive\n");
- uint8_t *payload = new uint8_t[hdr.payloadLen];
- uint8_t *payloadPtr = payload;
- status = readData(newsockfd, payload, hdr.payloadLen);
- while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
- {
- uint32_t msgID = 0xFFFFFFFF;
- payloadPtr = decode(payloadPtr, msgID);
- if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER)
- {
- // Fixed 8 bytes
- uint8_t seqNum[8];
- memcpy(seqNum, payloadPtr, 8);
- printf("Flow Control Protocol Register Req receive serial num\n");
- payloadPtr += 8;
- }
- else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME)
- {
- uint32_t len;
- payloadPtr = decode(payloadPtr, len);
- printf("Flow Control Protocol receive YAML name length %d\n", len);
- std::string flowName = (const char *) payloadPtr;
- payloadPtr += len;
- printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
- }
- else
- {
- break;
- }
- }
- delete[] payload;
- // Send Register Respond
- // Calculate the total payload msg size
- char *ymlContent;
- uint32_t yamlLen = readYAML(&ymlContent);
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
- if (yamlLen > 0)
- payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, yamlLen);
-
- uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
- uint8_t *data = new uint8_t[size];
- uint8_t *start = data;
-
- // encode the HDR
- hdr.msgType = REGISTER_RESP;
- hdr.payloadLen = payloadSize;
- hdr.status = RESP_SUCCESS;
- data = encode(data, hdr.msgType);
- data = encode(data, hdr.seqNumber);
- data = encode(data, hdr.status);
- data = encode(data, hdr.payloadLen);
-
- // encode the report interval
- data = encode(data, REPORT_INTERVAL);
- data = encode(data, DEFAULT_REPORT_INTERVAL);
-
- // encode the YAML content
- if (yamlLen > 0)
- {
- data = encode(data, FLOW_YAML_CONTENT);
- data = encode(data, yamlLen);
- data = encode(data, (uint8_t *) ymlContent, yamlLen);
- delete[] ymlContent;
- }
-
- // send it
- status = sendData(newsockfd, start, size);
- delete[] start;
- }
- else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ)
- {
- printf("Flow Control Protocol Report Req receive\n");
- uint8_t *payload = new uint8_t[hdr.payloadLen];
- uint8_t *payloadPtr = payload;
- status = readData(newsockfd, payload, hdr.payloadLen);
- while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
- {
- uint32_t msgID = 0xFFFFFFFF;
- payloadPtr = decode(payloadPtr, msgID);
- if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME)
- {
- uint32_t len;
- payloadPtr = decode(payloadPtr, len);
- printf("Flow Control Protocol receive YAML name length %d\n", len);
- std::string flowName = (const char *) payloadPtr;
- payloadPtr += len;
- printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
- }
- else
- {
- break;
- }
- }
- delete[] payload;
- // Send Register Respond
- // Calculate the total payload msg size
- std::string processor = "RealTimeDataCollector";
- std::string propertyName1 = "real Time Message ID";
- std::string propertyValue1 = "41";
- std::string propertyName2 = "Batch Message ID";
- std::string propertyValue2 = "172,30,48";
- if (flag == 0)
- {
- propertyName1 = "Real Time Message ID";
- propertyValue1 = "41";
- propertyName2 = "Batch Message ID";
- propertyValue2 = "172,48";
- flag = 1;
- }
- else if (flag == 1)
- {
- propertyName1 = "Real Time Message ID";
- propertyValue1 = "172,48";
- propertyName2 = "Batch Message ID";
- propertyValue2 = "41";
- flag = 0;
- }
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
-
- uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
- uint8_t *data = new uint8_t[size];
- uint8_t *start = data;
-
- // encode the HDR
- hdr.msgType = REPORT_RESP;
- hdr.payloadLen = payloadSize;
- hdr.status = RESP_SUCCESS;
-
- if (number >= 10 && number < 20)
- {
- // After 10 second report, stop the flow controller for 10 second
- hdr.status = RESP_STOP_FLOW_CONTROLLER;
- }
- else if (number == 20)
- {
- // restart the flow controller after 10 second
- hdr.status = RESP_START_FLOW_CONTROLLER;
- }
- else if (number == 30)
- {
- // retrigger register
- hdr.status = RESP_TRIGGER_REGISTER;
- number = 0;
- }
-
- number++;
-
- data = encode(data, hdr.msgType);
- data = encode(data, hdr.seqNumber);
- data = encode(data, hdr.status);
- data = encode(data, hdr.payloadLen);
-
- // encode the processorName
- data = encode(data, PROCESSOR_NAME);
- data = encode(data, processor);
-
- // encode the propertyName and value TLV
- data = encode(data, PROPERTY_NAME);
- data = encode(data, propertyName1);
- data = encode(data, PROPERTY_VALUE);
- data = encode(data, propertyValue1);
- data = encode(data, PROPERTY_NAME);
- data = encode(data, propertyName2);
- data = encode(data, PROPERTY_VALUE);
- data = encode(data, propertyValue2);
- // send it
- status = sendData(newsockfd, start, size);
- delete[] start;
- }
- }
- close(newsockfd);
- }
- close(sockfd);
- }
- else
- {
- clilen = sizeof(cli_addr);
- newsockfd = accept(sockfd,
- (struct sockaddr *) &cli_addr,
- &clilen);
- if (newsockfd < 0)
- error("ERROR on accept");
- while (1)
- {
- bzero(buffer,4096);
- n = readline(newsockfd,buffer,4095);
- if (n <= 0 )
- {
- close(newsockfd);
- newsockfd = accept(sockfd,
- (struct sockaddr *) &cli_addr,
- &clilen);
- continue;
- }
- printf("%s",buffer);
- }
- close(newsockfd);
- close(sockfd);
- }
- return 0;
+int main(int argc, char *argv[]) {
+ int portno;
+ socklen_t clilen;
+ struct sockaddr_in serv_addr, cli_addr;
+ char buffer[4096];
+ int flag = 0;
+ int number = 0;
+
+ int n;
+ if (argc < 2) {
+ fprintf(stderr, "ERROR, no port provided\n");
+ exit(1);
+ }
+
+ if (signal(SIGINT, sigHandler) == SIG_ERR
+ || signal(SIGTERM, sigHandler) == SIG_ERR) {
+
+ return -1;
+ }
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0)
+ error("ERROR opening socket");
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ portno = atoi(argv[1]);
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(portno);
+ if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
+ error("ERROR on binding");
+ listen(sockfd, 5);
+ if (portno == DEFAULT_NIFI_SERVER_PORT) {
+ while (true) {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
+ if (newsockfd < 0) {
+ error("ERROR on accept");
+ break;
+ }
+ // process request
+ FlowControlProtocolHeader hdr;
+ int status = readHdr(newsockfd, &hdr);
+ if (status > 0) {
+ printf("Flow Control Protocol receive MsgType %s\n",
+ FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
+ printf("Flow Control Protocol receive Resp Code %s\n",
+ FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ printf("Flow Control Protocol receive Payload len %d\n",
+ hdr.payloadLen);
+ if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) {
+ printf("Flow Control Protocol Register Req receive\n");
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(newsockfd, payload, hdr.payloadLen);
+ while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) {
+ uint32_t msgID = 0xFFFFFFFF;
+ payloadPtr = decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER) {
+ // Fixed 8 bytes
+ uint8_t seqNum[8];
+ memcpy(seqNum, payloadPtr, 8);
+ printf("Flow Control Protocol Register Req receive serial num\n");
+ payloadPtr += 8;
+ } else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
+ uint32_t len;
+ payloadPtr = decode(payloadPtr, len);
+ printf("Flow Control Protocol receive YAML name length %d\n",
+ len);
+ std::string flowName = (const char *) payloadPtr;
+ payloadPtr += len;
+ printf("Flow Control Protocol receive YAML name %s\n",
+ flowName.c_str());
+ } else {
+ break;
+ }
+ }
+ delete[] payload;
+ // Send Register Respond
+ // Calculate the total payload msg size
+ char *ymlContent;
+ uint32_t yamlLen = readYAML(&ymlContent);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL,
+ 0);
+ if (yamlLen > 0)
+ payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT,
+ yamlLen);
+
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ hdr.msgType = REGISTER_RESP;
+ hdr.payloadLen = payloadSize;
+ hdr.status = RESP_SUCCESS;
+ data = encode(data, hdr.msgType);
+ data = encode(data, hdr.seqNumber);
+ data = encode(data, hdr.status);
+ data = encode(data, hdr.payloadLen);
+
+ // encode the report interval
+ data = encode(data, REPORT_INTERVAL);
+ data = encode(data, DEFAULT_REPORT_INTERVAL);
+
+ // encode the YAML content
+ if (yamlLen > 0) {
+ data = encode(data, FLOW_YAML_CONTENT);
+ data = encode(data, yamlLen);
+ data = encode(data, (uint8_t *) ymlContent, yamlLen);
+ delete[] ymlContent;
+ }
+
+ // send it
+ status = sendData(newsockfd, start, size);
+ delete[] start;
+ } else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ) {
+ printf("Flow Control Protocol Report Req receive\n");
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(newsockfd, payload, hdr.payloadLen);
+ while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) {
+ uint32_t msgID = 0xFFFFFFFF;
+ payloadPtr = decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
+ uint32_t len;
+ payloadPtr = decode(payloadPtr, len);
+ printf("Flow Control Protocol receive YAML name length %d\n",
+ len);
+ std::string flowName = (const char *) payloadPtr;
+ payloadPtr += len;
+ printf("Flow Control Protocol receive YAML name %s\n",
+ flowName.c_str());
+ } else {
+ break;
+ }
+ }
+ delete[] payload;
+ // Send Register Respond
+ // Calculate the total payload msg size
+ std::string processor = "RealTimeDataCollector";
+ std::string propertyName1 = "real Time Message ID";
+ std::string propertyValue1 = "41";
+ std::string propertyName2 = "Batch Message ID";
+ std::string propertyValue2 = "172,30,48";
+ if (flag == 0) {
+ propertyName1 = "Real Time Message ID";
+ propertyValue1 = "41";
+ propertyName2 = "Batch Message ID";
+ propertyValue2 = "172,48";
+ flag = 1;
+ } else if (flag == 1) {
+ propertyName1 = "Real Time Message ID";
+ propertyValue1 = "172,48";
+ propertyName2 = "Batch Message ID";
+ propertyValue2 = "41";
+ flag = 0;
+ }
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(
+ PROCESSOR_NAME, processor.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
+ propertyName1.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
+ propertyValue1.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
+ propertyName2.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
+ propertyValue2.size() + 1);
+
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ hdr.msgType = REPORT_RESP;
+ hdr.payloadLen = payloadSize;
+ hdr.status = RESP_SUCCESS;
+
+ if (number >= 10 && number < 20) {
+ // After 10 second report, stop the flow controller for 10 second
+ hdr.status = RESP_STOP_FLOW_CONTROLLER;
+ } else if (number == 20) {
+ // restart the flow controller after 10 second
+ hdr.status = RESP_START_FLOW_CONTROLLER;
+ } else if (number == 30) {
+ // retrigger register
+ hdr.status = RESP_TRIGGER_REGISTER;
+ number = 0;
+ }
+
+ number++;
+
+ data = encode(data, hdr.msgType);
+ data = encode(data, hdr.seqNumber);
+ data = encode(data, hdr.status);
+ data = encode(data, hdr.payloadLen);
+
+ // encode the processorName
+ data = encode(data, PROCESSOR_NAME);
+ data = encode(data, processor);
+
+ // encode the propertyName and value TLV
+ data = encode(data, PROPERTY_NAME);
+ data = encode(data, propertyName1);
+ data = encode(data, PROPERTY_VALUE);
+ data = encode(data, propertyValue1);
+ data = encode(data, PROPERTY_NAME);
+ data = encode(data, propertyName2);
+ data = encode(data, PROPERTY_VALUE);
+ data = encode(data, propertyValue2);
+ // send it
+ status = sendData(newsockfd, start, size);
+ delete[] start;
+ }
+ }
+ close(newsockfd);
+ }
+ close(sockfd);
+ } else {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
+ if (newsockfd < 0)
+ error("ERROR on accept");
+ while (1) {
+ bzero(buffer, 4096);
+ n = readline(newsockfd, buffer, 4095);
+ if (n <= 0) {
+ close(newsockfd);
+ newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
+ continue;
+ }
+ printf("%s", buffer);
+ }
+ close(newsockfd);
+ close(sockfd);
+ }
+ return 0;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 0b3ca3e..18d5d01 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -73,11 +73,13 @@ class TestController {
}
}
- void setDebugToConsole(std::shared_ptr<org::apache::nifi::minifi::Configure> configure) {
+ void setDebugToConsole(
+ std::shared_ptr<org::apache::nifi::minifi::Configure> configure) {
std::ostringstream oss;
std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
logging::BaseLogger>(
- new org::apache::nifi::minifi::core::logging::OutputStreamAppender(std::cout, configure));
+ new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
+ std::cout, configure));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
logger->updateLogger(std::move(outputLogger));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
index 00add78..9bfee2b 100644
--- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
+++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp
@@ -21,6 +21,7 @@
#include <fstream>
#include <memory>
#include <string>
+#include <utility>
#include <thread>
#include <type_traits>
#include "core/logging/LogAppenders.h"
@@ -43,8 +44,6 @@
REGISTER_RESOURCE(MockControllerService);
REGISTER_RESOURCE(MockProcessor);
-std::string test_file_location;
-std::string key_dir;
std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(
std::shared_ptr<core::controller::ControllerServiceProvider> provider,
@@ -63,6 +62,8 @@ void waitToVerifyProcessor() {
}
int main(int argc, char **argv) {
+ std::string test_file_location;
+ std::string key_dir;
if (argc > 2) {
test_file_location = argv[1];
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/integration/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpGetIntegrationTest.cpp b/libminifi/test/integration/HttpGetIntegrationTest.cpp
index aa24dfe..d4eabf5 100644
--- a/libminifi/test/integration/HttpGetIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpGetIntegrationTest.cpp
@@ -16,7 +16,9 @@
* limitations under the License.
*/
+#include <sys/stat.h>
#include <cassert>
+#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
@@ -24,8 +26,7 @@
#include <thread>
#include <type_traits>
#include <vector>
-#include <sys/stat.h>
-
+#include "../TestBase.h"
#include "utils/StringUtils.h"
#include "core/Core.h"
#include "core/logging/LogAppenders.h"
@@ -38,20 +39,18 @@
#include "../unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
-
void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
int main(int argc, char **argv) {
-
- std::string key_dir,test_file_location;
+ std::string key_dir, test_file_location;
if (argc > 1) {
test_file_location = argv[1];
key_dir = argv[2];
}
std::shared_ptr<minifi::Configure> configuration = std::make_shared<
- minifi::Configure>();
+ minifi::Configure>();
configuration->set(minifi::Configure::nifi_default_directory, key_dir);
mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
std::ostringstream oss;
@@ -61,7 +60,7 @@ int main(int argc, char **argv) {
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
logger->updateLogger(std::move(outputLogger));
- logger->setLogLevel("trace");
+ logger->setLogLevel("debug");
std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
@@ -82,12 +81,11 @@ int main(int argc, char **argv) {
std::shared_ptr<minifi::FlowController> controller = std::make_shared<
minifi::FlowController>(test_repo, test_flow_repo, configuration,
- std::move(yaml_ptr),
- DEFAULT_ROOT_GROUP_NAME,
+ std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME,
true);
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,configuration,
- test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
+ configuration, test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
test_file_location);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/integration/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpPostIntegrationTest.cpp b/libminifi/test/integration/HttpPostIntegrationTest.cpp
index 45abd34..8e6ebec 100644
--- a/libminifi/test/integration/HttpPostIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpPostIntegrationTest.cpp
@@ -16,16 +16,16 @@
* limitations under the License.
*/
+#include <sys/stat.h>
#include <cassert>
#include <chrono>
#include <fstream>
+#include <utility>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
-#include <sys/stat.h>
-
#include "utils/StringUtils.h"
#include "core/Core.h"
#include "core/logging/LogAppenders.h"
@@ -37,16 +37,14 @@
#include "properties/Configure.h"
#include "../unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
-#include "properties/Configure.h"
-std::string test_file_location;
void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(2));
}
int main(int argc, char **argv) {
-
+ std::string test_file_location;
if (argc > 1) {
test_file_location = argv[1];
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/integration/TestExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/TestExecuteProcess.cpp b/libminifi/test/integration/TestExecuteProcess.cpp
index a7c6da6..e947a46 100644
--- a/libminifi/test/integration/TestExecuteProcess.cpp
+++ b/libminifi/test/integration/TestExecuteProcess.cpp
@@ -17,6 +17,9 @@
*/
#include <uuid/uuid.h>
+#include <utility>
+#include <memory>
+#include <vector>
#include <fstream>
#include "../unit/ProvenanceTestHelper.h"
@@ -34,9 +37,7 @@
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
-int main(int argc, char **argv)
-{
-
+int main(int argc, char **argv) {
std::ostringstream oss;
std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
logging::BaseLogger>(
@@ -45,10 +46,9 @@ int main(int argc, char **argv)
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
logger->updateLogger(std::move(outputLogger));
-
outputLogger = std::unique_ptr<logging::BaseLogger>(
- new org::apache::nifi::minifi::core::logging::NullAppender());
- logger->updateLogger(std::move(outputLogger));
+ new org::apache::nifi::minifi::core::logging::NullAppender());
+ logger->updateLogger(std::move(outputLogger));
std::shared_ptr<core::Processor> processor = std::make_shared<
org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
@@ -89,49 +89,40 @@ int main(int argc, char **argv)
std::vector<std::thread> processor_workers;
core::ProcessorNode node2(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider =
+ nullptr;
std::shared_ptr<core::ProcessContext> contextset = std::make_shared<
- core::ProcessContext>(node2,controller_services_provider, test_repo);
+ core::ProcessContext>(node2, controller_services_provider, test_repo);
core::ProcessSessionFactory factory(contextset.get());
processor->onSchedule(contextset.get(), &factory);
for (int i = 0; i < 1; i++) {
- //
processor_workers.push_back(
std::thread(
- [processor,test_repo,&is_ready]()
- {
+ [processor, test_repo, &is_ready]() {
core::ProcessorNode node(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node,controller_services_provider, test_repo);
- context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,"sleep 0.5");
- //context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::CommandArguments," 1 >>" + ss.str());
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo);
+ context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5");
std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context.get());
- while(!is_ready.load(std::memory_order_relaxed)) {
-
+ while (!is_ready.load(std::memory_order_relaxed)) {
}
-
processor->onTrigger(context.get(), session.get());
-
}));
}
is_ready.store(true, std::memory_order_relaxed);
- //is_ready.store(true);
std::for_each(processor_workers.begin(), processor_workers.end(),
- [](std::thread &t)
- {
+ [](std::thread &t) {
t.join();
});
- outputLogger = std::unique_ptr<logging::BaseLogger>(
+ outputLogger = std::unique_ptr<logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(outputLogger));
-
std::shared_ptr<org::apache::nifi::minifi::processors::ExecuteProcess> execp =
std::static_pointer_cast<
org::apache::nifi::minifi::processors::ExecuteProcess>(processor);
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/nodefs/NoLevelDB.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoLevelDB.cpp b/libminifi/test/nodefs/NoLevelDB.cpp
index 09b4916..677886e 100644
--- a/libminifi/test/nodefs/NoLevelDB.cpp
+++ b/libminifi/test/nodefs/NoLevelDB.cpp
@@ -17,7 +17,7 @@
*/
#include "../TestBase.h"
-
+#include <memory>
#include "core/Core.h"
#include "core/RepositoryFactory.h"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/nodefs/NoYamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoYamlConfiguration.cpp b/libminifi/test/nodefs/NoYamlConfiguration.cpp
index c720264..9a979a3 100644
--- a/libminifi/test/nodefs/NoYamlConfiguration.cpp
+++ b/libminifi/test/nodefs/NoYamlConfiguration.cpp
@@ -16,23 +16,20 @@
* limitations under the License.
*/
-
#include "core/Core.h"
+#include <memory>
#include "core/RepositoryFactory.h"
-
#include "core/ConfigurationFactory.h"
TEST_CASE("NoYamlSupport1", "[NoYamlSupport1]") {
std::shared_ptr<core::Repository> prov_repo = core::createRepository(
"provenancerepository", true);
-REQUIRE(nullptr != prov_repo);
-std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move(
+ REQUIRE(nullptr != prov_repo);
+ std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move(
core::createFlowConfiguration(prov_repo, prov_repo, std::make_shared<minifi::Configure>(), std::make_shared<minifi::io::StreamFactory>(false),
- "yamlconfiguration"));
-
+ "yamlconfiguration"));
REQUIRE(nullptr != flow_configuration);
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/CRCTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp
index 74279f4..d2b0466 100644
--- a/libminifi/test/unit/CRCTests.cpp
+++ b/libminifi/test/unit/CRCTests.cpp
@@ -25,18 +25,14 @@
#include "../TestBase.h"
TEST_CASE("Test CRC1", "[testcrc1]") {
-
org::apache::nifi::minifi::io::BaseStream base;
org::apache::nifi::minifi::io::CRCStream<
org::apache::nifi::minifi::io::BaseStream> test(&base);
- test.writeData((uint8_t*) "cow", 3);
+ test.writeData(reinterpret_cast<uint8_t*>(const_cast<char*>("cow")), 3);
REQUIRE(2580823964 == test.getCRC());
-
-
}
TEST_CASE("Test CRC2", "[testcrc2]") {
-
org::apache::nifi::minifi::io::BaseStream base;
org::apache::nifi::minifi::io::CRCStream<
org::apache::nifi::minifi::io::BaseStream> test(&base);
@@ -44,38 +40,31 @@ TEST_CASE("Test CRC2", "[testcrc2]") {
std::vector<uint8_t> charvect(fox.begin(), fox.end());
test.writeData(charvect, charvect.size());
REQUIRE(1922388889 == test.getCRC());
-
}
TEST_CASE("Test CRC3", "[testcrc3]") {
-
org::apache::nifi::minifi::io::BaseStream base;
org::apache::nifi::minifi::io::CRCStream<
org::apache::nifi::minifi::io::BaseStream> test(&base);
uint64_t number = 7;
test.write(number);
REQUIRE(4215687882 == test.getCRC());
-
}
TEST_CASE("Test CRC4", "[testcrc4]") {
-
org::apache::nifi::minifi::io::BaseStream base;
org::apache::nifi::minifi::io::CRCStream<
org::apache::nifi::minifi::io::BaseStream> test(&base);
uint32_t number = 7;
test.write(number);
REQUIRE(3206564543 == test.getCRC());
-
}
TEST_CASE("Test CRC5", "[testcrc5]") {
-
org::apache::nifi::minifi::io::BaseStream base;
org::apache::nifi::minifi::io::CRCStream<
org::apache::nifi::minifi::io::BaseStream> test(&base);
uint16_t number = 7;
test.write(number);
REQUIRE(3753740124 == test.getCRC());
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ClassLoaderTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ClassLoaderTests.cpp b/libminifi/test/unit/ClassLoaderTests.cpp
index d2d2664..7928832 100644
--- a/libminifi/test/unit/ClassLoaderTests.cpp
+++ b/libminifi/test/unit/ClassLoaderTests.cpp
@@ -17,7 +17,6 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <uuid/uuid.h>
-#include "core/ClassLoader.h"
#include "../TestBase.h"
#include "io/ClientSocket.h"
#include "core/Processor.h"
@@ -25,12 +24,21 @@
#include "processors/AppendHostInfo.h"
#include "core/logging/LogAppenders.h"
-using namespace org::apache::nifi::minifi::io;
TEST_CASE("TestLoader", "[TestLoader]") {
-
-REQUIRE ( nullptr != core::ClassLoader::getDefaultClassLoader().instantiate("AppendHostInfo","hosty"));
-REQUIRE ( nullptr != core::ClassLoader::getDefaultClassLoader().instantiate("ListenHTTP","hosty2"));
-REQUIRE ( nullptr == core::ClassLoader::getDefaultClassLoader().instantiate("Don'tExist","hosty3"));
-REQUIRE ( nullptr == core::ClassLoader::getDefaultClassLoader().instantiate("","EmptyEmpty"));
-
+ REQUIRE(
+ nullptr
+ != core::ClassLoader::getDefaultClassLoader().instantiate(
+ "AppendHostInfo", "hosty"));
+ REQUIRE(
+ nullptr
+ != core::ClassLoader::getDefaultClassLoader().instantiate(
+ "ListenHTTP", "hosty2"));
+ REQUIRE(
+ nullptr
+ == core::ClassLoader::getDefaultClassLoader().instantiate(
+ "Don'tExist", "hosty3"));
+ REQUIRE(
+ nullptr
+ == core::ClassLoader::getDefaultClassLoader().instantiate(
+ "", "EmptyEmpty"));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ControllerServiceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ControllerServiceTests.cpp b/libminifi/test/unit/ControllerServiceTests.cpp
index d657d87..508e37f 100644
--- a/libminifi/test/unit/ControllerServiceTests.cpp
+++ b/libminifi/test/unit/ControllerServiceTests.cpp
@@ -17,6 +17,8 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <uuid/uuid.h>
+#include <memory>
+#include <string>
#include <fstream>
#include "FlowController.h"
#include "../TestBase.h"
@@ -50,8 +52,8 @@ TEST_CASE("Test ControllerServicesMap", "[cs1]") {
REQUIRE(nullptr != map.getControllerServiceNode("ID"));
- REQUIRE(false== map.put("",testNode));
- REQUIRE(false== map.put("",nullptr));
+ REQUIRE(false== map.put("", testNode));
+ REQUIRE(false== map.put("", nullptr));
// ensure the pointer is the same
@@ -86,5 +88,4 @@ std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(
return testNode;
}
-
-}
+} /** namespace ControllerServiceTests **/
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp
index f5df8d8..458a1c7 100644
--- a/libminifi/test/unit/InvokeHTTPTests.cpp
+++ b/libminifi/test/unit/InvokeHTTPTests.cpp
@@ -19,6 +19,11 @@
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <uuid/uuid.h>
#include <fstream>
+#include <map>
+#include <memory>
+#include <utility>
+#include <string>
+#include <set>
#include "FlowController.h"
#include "../TestBase.h"
#include "core/logging/LogAppenders.h"
@@ -33,7 +38,6 @@
#include "core/ProcessorNode.h"
TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
-
std::stringstream oss;
std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
logging::BaseLogger>(
@@ -146,14 +150,13 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
}
std::shared_ptr<core::FlowFile> ffr = session2.get();
std::string log_attribute_output = oss.str();
+ std::cout << log_attribute_output << std::endl;
REQUIRE(
log_attribute_output.find("exiting because method is POST")
!= std::string::npos);
-
}
TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
-
std::stringstream oss;
std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
logging::BaseLogger>(
@@ -280,16 +283,13 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
REQUIRE(
log_attribute_output.find("exiting because method is POST")
!= std::string::npos);
-
}
class CallBack : public minifi::OutputStreamCallback {
public:
CallBack() {
-
}
virtual ~CallBack() {
-
}
virtual void process(std::ofstream *stream) {
std::string st = "we're gnna write some test stuff";
@@ -298,7 +298,6 @@ class CallBack : public minifi::OutputStreamCallback {
};
TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
-
std::stringstream oss;
std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
logging::BaseLogger>(
@@ -442,6 +441,5 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
REQUIRE(
log_attribute_output.find("exiting because method is POST")
!= std::string::npos);
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/LoggerTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index b083671..9139be2 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -16,13 +16,13 @@
* limitations under the License.
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include <utility>
+#include <string>
#include <memory>
#include <ctime>
#include "../TestBase.h"
#include "core/logging/LogAppenders.h"
-using namespace logging;
-
bool contains(std::string stringA, std::string ending) {
return (ending.length() > 0 && stringA.find(ending) != std::string::npos);
}
@@ -30,7 +30,8 @@ bool contains(std::string stringA, std::string ending) {
TEST_CASE("Test log Levels", "[ttl1]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -39,12 +40,10 @@ TEST_CASE("Test log Levels", "[ttl1]") {
logger->log_info("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [info] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [info] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -53,7 +52,8 @@ TEST_CASE("Test log Levels", "[ttl1]") {
TEST_CASE("Test log Levels debug", "[ttl2]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -62,12 +62,10 @@ TEST_CASE("Test log Levels debug", "[ttl2]") {
logger->log_debug("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [debug] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [debug] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -76,7 +74,8 @@ TEST_CASE("Test log Levels debug", "[ttl2]") {
TEST_CASE("Test log Levels trace", "[ttl3]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -86,12 +85,10 @@ TEST_CASE("Test log Levels trace", "[ttl3]") {
logger->log_trace("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [trace] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [trace] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -100,7 +97,8 @@ TEST_CASE("Test log Levels trace", "[ttl3]") {
TEST_CASE("Test log Levels error", "[ttl4]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -110,12 +108,10 @@ TEST_CASE("Test log Levels error", "[ttl4]") {
logger->log_error("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -124,7 +120,8 @@ TEST_CASE("Test log Levels error", "[ttl4]") {
TEST_CASE("Test log Levels change", "[ttl5]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -134,10 +131,7 @@ TEST_CASE("Test log Levels change", "[ttl5]") {
logger->log_error("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
oss.str("");
oss.clear();
REQUIRE(0 == oss.str().length());
@@ -147,20 +141,20 @@ TEST_CASE("Test log Levels change", "[ttl5]") {
REQUIRE(0 == oss.str().length());
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
-
}
-
TEST_CASE("Test log LevelsConfigured", "[ttl6]") {
std::ostringstream oss;
- std::shared_ptr<minifi::Configure> config = std::make_shared<minifi::Configure>();
+ std::shared_ptr<minifi::Configure> config =
+ std::make_shared<minifi::Configure>();
- config->set(BaseLogger::nifi_log_appender, "OutputStreamAppender");
+ config->set(logging::BaseLogger::nifi_log_appender, "OutputStreamAppender");
config->set(
org::apache::nifi::minifi::core::logging::OutputStreamAppender::nifi_log_output_stream_error_stderr,
"true");
@@ -170,8 +164,8 @@ TEST_CASE("Test log LevelsConfigured", "[ttl6]") {
auto oldrdbuf = std::cerr.rdbuf();
std::cerr.rdbuf(oss.rdbuf());
- std::unique_ptr<BaseLogger> newLogger = LogInstance::getConfiguredLogger(
- config);
+ std::unique_ptr<logging::BaseLogger> newLogger =
+ logging::LogInstance::getConfiguredLogger(config);
logger->updateLogger(std::move(newLogger));
@@ -181,16 +175,13 @@ TEST_CASE("Test log LevelsConfigured", "[ttl6]") {
logger->log_error("hello world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
std::cerr.rdbuf(oldrdbuf);
- config->set(BaseLogger::nifi_log_appender, "nullappender");
+ config->set(logging::BaseLogger::nifi_log_appender, "nullappender");
- newLogger = LogInstance::getConfiguredLogger(config);
+ newLogger = logging::LogInstance::getConfiguredLogger(config);
logger->updateLogger(std::move(newLogger));
@@ -204,13 +195,13 @@ TEST_CASE("Test log LevelsConfigured", "[ttl6]") {
logger->log_trace("hello world");
REQUIRE(0 == oss.str().length());
-
}
TEST_CASE("Test log Levels With std::string", "[ttl1]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -220,10 +211,7 @@ TEST_CASE("Test log Levels With std::string", "[ttl1]") {
logger->log_error("hello %s", world);
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
oss.str("");
oss.clear();
REQUIRE(0 == oss.str().length());
@@ -233,17 +221,18 @@ TEST_CASE("Test log Levels With std::string", "[ttl1]") {
REQUIRE(0 == oss.str().length());
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
-
}
TEST_CASE("Test log Levels debug With std::string ", "[ttl2]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -253,12 +242,10 @@ TEST_CASE("Test log Levels debug With std::string ", "[ttl2]") {
logger->log_debug("hello %s", world);
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [debug] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [debug] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -267,7 +254,8 @@ TEST_CASE("Test log Levels debug With std::string ", "[ttl2]") {
TEST_CASE("Test log Levels trace With std::string", "[ttl3]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -277,12 +265,10 @@ TEST_CASE("Test log Levels trace With std::string", "[ttl3]") {
logger->log_trace("hello %s", world);
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [trace] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [trace] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -291,7 +277,8 @@ TEST_CASE("Test log Levels trace With std::string", "[ttl3]") {
TEST_CASE("Test log Levels error With std::string ", "[ttl4]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -302,12 +289,10 @@ TEST_CASE("Test log Levels error With std::string ", "[ttl4]") {
logger->log_error("hello %s", world);
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
@@ -316,7 +301,8 @@ TEST_CASE("Test log Levels error With std::string ", "[ttl4]") {
TEST_CASE("Test log Levels change With std::string ", "[ttl5]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -327,10 +313,7 @@ TEST_CASE("Test log Levels change With std::string ", "[ttl5]") {
logger->log_error("hello %s", world);
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
oss.str("");
oss.clear();
REQUIRE(0 == oss.str().length());
@@ -340,17 +323,18 @@ TEST_CASE("Test log Levels change With std::string ", "[ttl5]") {
REQUIRE(0 == oss.str().length());
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
-
}
TEST_CASE("Test log Levels change With std::string maybe ", "[ttl5]") {
std::ostringstream oss;
- std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
0));
std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
@@ -360,10 +344,7 @@ TEST_CASE("Test log Levels change With std::string maybe ", "[ttl5]") {
logger->log_error("hello %s", "world");
REQUIRE(
- true
- == contains(
- oss.str(),
- "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
+ true == contains(oss.str(), "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world"));
oss.str("");
oss.clear();
REQUIRE(0 == oss.str().length());
@@ -373,9 +354,9 @@ TEST_CASE("Test log Levels change With std::string maybe ", "[ttl5]") {
REQUIRE(0 == oss.str().length());
- std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>(
+ std::unique_ptr<logging::BaseLogger> nullAppender = std::unique_ptr<
+ logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(nullAppender));
-
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/63c53bcf/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 1c447a8..82f9cae 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -1,4 +1,3 @@
-
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -18,6 +17,11 @@
*/
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <uuid/uuid.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
#include <fstream>
#include "../unit/ProvenanceTestHelper.h"
#include "../TestBase.h"
@@ -34,16 +38,13 @@
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-
-
TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
std::shared_ptr<core::Processor> processor = std::make_shared<
- org::apache::nifi::minifi::processors::GetFile>("processorname");
+ org::apache::nifi::minifi::processors::GetFile>("processorname");
REQUIRE(processor->getName() == "processorname");
}
TEST_CASE("Test Find file", "[getfileCreate2]") {
-
TestController testController;
testController.enableDebug();
@@ -53,7 +54,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
std::shared_ptr<core::Processor> processorReport =
std::make_shared<
- org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()));
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+ std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(
+ std::make_shared<org::apache::nifi::minifi::Configure>()));
std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
@@ -131,8 +134,9 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
for (auto entry : repo->getRepoMap()) {
provenance::ProvenanceEventRecord newRecord;
- newRecord.DeSerialize((uint8_t*) entry.second.data(),
- entry.second.length());
+ newRecord.DeSerialize(
+ reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())),
+ entry.second.length());
bool found = false;
for (auto provRec : records) {
@@ -146,13 +150,14 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
break;
}
}
- if (!found)
+ if (!found) {
throw std::runtime_error("Did not find record");
-
+ }
}
core::ProcessorNode nodeReport(processorReport);
- core::ProcessContext contextReport(nodeReport,controller_services_provider, test_repo);
+ core::ProcessContext contextReport(nodeReport, controller_services_provider,
+ test_repo);
core::ProcessSessionFactory factoryReport(&contextReport);
core::ProcessSession sessionReport(&contextReport);
processorReport->onSchedule(&contextReport, &factoryReport);
@@ -180,7 +185,6 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
}
TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
-
TestController testController;
testController.enableDebug();
@@ -227,7 +231,6 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
int prev = 0;
for (int i = 0; i < 10; i++) {
-
core::ProcessSession session(&context);
REQUIRE(processor->getName() == "getfileCreate2");
@@ -268,9 +271,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
REQUIRE((repo->getRepoMap().size() % 2) == 0);
REQUIRE(repo->getRepoMap().size() == (prev + 2));
prev += 2;
-
}
-
}
TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
@@ -382,7 +383,6 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(&context2, &session2);
- //session2.commit();
records = reporter->getEvents();
std::string log_attribute_output = oss.str();
@@ -397,7 +397,6 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
outputLogger = std::unique_ptr<logging::BaseLogger>(
new org::apache::nifi::minifi::core::logging::NullAppender());
logger->updateLogger(std::move(outputLogger));
-
}
int fileSize(const char *add) {