You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/07/27 16:43:41 UTC
[2/6] nifi-minifi-cpp git commit: MINIFI-249: Update prov repo to
better abstract deser.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpGetIntegrationTest.cpp b/libminifi/test/integration/HttpGetIntegrationTest.cpp
index ae60dc1..a235759 100644
--- a/libminifi/test/integration/HttpGetIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpGetIntegrationTest.cpp
@@ -26,6 +26,7 @@
#include <thread>
#include <type_traits>
#include <vector>
+#include "../TestServer.h"
#include "../TestBase.h"
#include "utils/StringUtils.h"
#include "core/Core.h"
@@ -41,9 +42,23 @@ void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
+int log_message(const struct mg_connection *conn, const char *message) {
+ puts(message);
+ return 1;
+}
+
+int ssl_enable(void *ssl_context, void *user_data) {
+ struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+ return 0;
+}
+
int main(int argc, char **argv) {
- LogTestController::getInstance().setInfo<minifi::processors::InvokeHTTP>();
- LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+ init_webserver();
+ LogTestController::getInstance().setDebug<core::Processor>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::string key_dir, test_file_location;
if (argc > 1) {
test_file_location = argv[1];
@@ -59,27 +74,61 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+ content_repo->initialize(configuration);
+
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME,
- true);
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+ content_repo,
+ DEFAULT_ROOT_GROUP_NAME,
+ true);
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
+ std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke");
+ assert(proc != nullptr);
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+ ptr.release();
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ struct mg_callbacks callback;
+ if (url.find("localhost") != std::string::npos) {
+ if (scheme == "https") {
+ std::string cert = "";
+ cert = key_dir + "nifi-cert.pem";
+ memset(&callback, 0, sizeof(callback));
+ callback.init_ssl = ssl_enable;
+ callback.log_message = log_message;
+ std::cout << cert << std::endl;
+ start_webserver(port, path, "hi this is a get test", &callback, cert);
+ } else {
+ start_webserver(port, path, "hi this is a get test");
+ }
+ }
controller->load();
controller->start();
waitToVerifyProcessor();
controller->waitUnload(60000);
+ if (url.find("localhost") != std::string::npos) {
+ stop_webserver();
+ }
std::string logs = LogTestController::getInstance().log_output.str();
+
assert(logs.find("key:filename value:") != std::string::npos);
- assert(logs.find("key:invokehttp.request.url value:https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c") != std::string::npos);
- assert(logs.find("Size:3734 Offset:0") != std::string::npos);
+ assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos);
assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos);
std::string stringtofind = "Resource Claim created ./content_repository/";
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/HttpPostIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/HttpPostIntegrationTest.cpp b/libminifi/test/integration/HttpPostIntegrationTest.cpp
index dfa284f..9a46574 100644
--- a/libminifi/test/integration/HttpPostIntegrationTest.cpp
+++ b/libminifi/test/integration/HttpPostIntegrationTest.cpp
@@ -28,6 +28,7 @@
#include <vector>
#include "utils/StringUtils.h"
#include "core/Core.h"
+#include "../TestServer.h"
#include "../include/core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
@@ -42,6 +43,7 @@ void waitToVerifyProcessor() {
}
int main(int argc, char **argv) {
+ init_webserver();
LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
std::string test_file_location;
@@ -51,7 +53,6 @@ int main(int argc, char **argv) {
mkdir("/tmp/aljr39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
std::ofstream myfile;
myfile.open("/tmp/aljr39/example.txt");
- myfile << "Hello world" << std::endl;
myfile.close();
mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
@@ -62,31 +63,44 @@ int main(int argc, char **argv) {
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(configuration);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
- DEFAULT_ROOT_GROUP_NAME,
- true);
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+ true);
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
+ std::shared_ptr<core::Processor> proc = ptr->findProcessor("OhJeez");
+ assert(proc != nullptr);
+
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+ ptr.release();
+ std::string port, scheme, path;
+ parse_http_components(url, port, scheme, path);
+ start_webserver(port, path, "hi this is a post test");
controller->load();
controller->start();
waitToVerifyProcessor();
controller->waitUnload(60000);
+ std::string logs = LogTestController::getInstance().log_output.str();
+ // stop webserver
+ stop_webserver();
assert(LogTestController::getInstance().contains("curl performed") == true);
- assert(LogTestController::getInstance().contains("Import offset 0 length 12") == true);
+ assert(LogTestController::getInstance().contains("Import offset 0 length 22") == true);
std::string stringtofind = "Resource Claim created ./content_repository/";
- std::string logs = LogTestController::getInstance().log_output.str();
size_t loc = logs.find(stringtofind);
while (loc > 0 && loc != std::string::npos) {
std::string id = logs.substr(loc + stringtofind.size(), 36);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/ProvenanceReportingTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index a7bcc2b..a6dc377 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -53,21 +53,20 @@ int main(int argc, char **argv) {
LogTestController::getInstance().setDebug<core::ProcessGroup>();
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration);
-
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
- DEFAULT_ROOT_GROUP_NAME,
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
true);
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/Site2SiteRestTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp
index 01aa7a8..1773cdb 100644
--- a/libminifi/test/integration/Site2SiteRestTest.cpp
+++ b/libminifi/test/integration/Site2SiteRestTest.cpp
@@ -45,22 +45,22 @@ void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
-class ConfigHandler: public CivetHandler {
+class ConfigHandler : public CivetHandler {
public:
bool handleGet(CivetServer *server, struct mg_connection *conn) {
static const std::string site2site_rest_resp = "{"
- "\"revision\": {"
- "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
- "},"
- "\"controller\": {"
- "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
- "\"name\": \"NiFi Flow\","
- "\"remoteSiteListeningPort\": 10001,"
- "\"siteToSiteSecure\": false"
- "}}";
+ "\"revision\": {"
+ "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
+ "},"
+ "\"controller\": {"
+ "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
+ "\"name\": \"NiFi Flow\","
+ "\"remoteSiteListeningPort\": 10001,"
+ "\"siteToSiteSecure\": false"
+ "}}";
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- site2site_rest_resp.length());
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ site2site_rest_resp.length());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
@@ -71,7 +71,7 @@ int main(int argc, char **argv) {
LogTestController::getInstance().setInfo<minifi::FlowController>();
const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 };
- std::vector < std::string > cpp_options;
+ std::vector<std::string> cpp_options;
for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
cpp_options.push_back(options[i]);
}
@@ -106,28 +106,31 @@ int main(int argc, char **argv) {
TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file,
- test_file_location);
+ test_file_location);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
- < minifi::io::StreamFactory > (configuration);
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
- < core::YamlConfiguration
- > (new core::YamlConfiguration(test_repo, test_repo, stream_factory,
- configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast
- < TestRepository > (test_repo);
+ <minifi::io::StreamFactory>(configuration);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared < minifi::FlowController
- > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true);
+ content_repo->initialize(configuration);
- core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory,
- configuration, test_file_location);
+ std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
+
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
+ content_repo,
+ DEFAULT_ROOT_GROUP_NAME,
+ true);
+
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory,
+ configuration,
+ test_file_location);
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
- test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup
- > (ptr.get());
+ test_file_location);
+ std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
+ >(ptr.get());
ptr.release();
controller->load();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/integration/TestExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/integration/TestExecuteProcess.cpp b/libminifi/test/integration/TestExecuteProcess.cpp
index ef0d113..5506c32 100644
--- a/libminifi/test/integration/TestExecuteProcess.cpp
+++ b/libminifi/test/integration/TestExecuteProcess.cpp
@@ -27,7 +27,7 @@
#include <memory>
#include <vector>
#include <fstream>
-
+#include "core/repository/VolatileContentRepository.h"
#include "../unit/ProvenanceTestHelper.h"
#include "FlowController.h"
#include "processors/GetFile.h"
@@ -47,15 +47,17 @@ int main(int argc, char **argv) {
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
processor->setMaxConcurrentTasks(1);
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo);
+ std::shared_ptr<core::Repository> test_repo =
+ std::make_shared<TestRepository>();
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::shared_ptr<TestRepository> repo =
+ std::static_pointer_cast<TestRepository>(test_repo);
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+ TestFlowController>(test_repo, test_repo, content_repo);
uuid_t processoruuid;
assert(true == processor->getUUID(processoruuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "executeProcessConnection");
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "executeProcessConnection");
connection->setRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
@@ -79,7 +81,7 @@ int main(int argc, char **argv) {
core::ProcessorNode node2(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo);
+ std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo, test_repo);
core::ProcessSessionFactory factory(contextset.get());
processor->onSchedule(contextset.get(), &factory);
@@ -87,7 +89,7 @@ int main(int argc, char **argv) {
processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() {
core::ProcessorNode node(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo, test_repo);
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5");
std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context.get());
while (!is_ready.load(std::memory_order_relaxed)) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPGet.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGet.yml b/libminifi/test/resources/TestHTTPGet.yml
index 2f64f2a..58f95d9 100644
--- a/libminifi/test/resources/TestHTTPGet.yml
+++ b/libminifi/test/resources/TestHTTPGet.yml
@@ -32,7 +32,7 @@ Processors:
auto-terminated relationships list:
Properties:
HTTP Method: GET
- Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c
+ Remote URL: http://localhost:10003/geturl
- name: OhJeez
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.LogAttribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPGetSecure.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml
index f3a23e5..9d19632 100644
--- a/libminifi/test/resources/TestHTTPGetSecure.yml
+++ b/libminifi/test/resources/TestHTTPGetSecure.yml
@@ -33,7 +33,7 @@ Processors:
Properties:
SSL Context Service: SSLContextService
HTTP Method: GET
- Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c
+ Remote URL: https://raw.githubusercontent.com/apache/nifi-minifi-cpp/master/docs/minifi-logo.png
- name: OhJeez
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.LogAttribute
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/TestHTTPPost.yml
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml
index 837194d..c76069a 100644
--- a/libminifi/test/resources/TestHTTPPost.yml
+++ b/libminifi/test/resources/TestHTTPPost.yml
@@ -46,7 +46,7 @@ Processors:
auto-terminated relationships list: response
Properties:
HTTP Method: POST
- Remote URL: http://requestb.in/u8ax9uu8
+ Remote URL: http://localhost:10003/urlofchampions
- name: Loggit
id: 2438e3c8-015a-1000-79ca-83af40ec1993
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/cn.ckey.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/cn.ckey.pem b/libminifi/test/resources/cn.ckey.pem
index 23017fa..fc42f06 100644
--- a/libminifi/test/resources/cn.ckey.pem
+++ b/libminifi/test/resources/cn.ckey.pem
@@ -1,5 +1,4 @@
Bag Attributes
- friendlyName: nifi-key
localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C
Key Attributes: <No Attributes>
-----BEGIN RSA PRIVATE KEY-----
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/cn.crt.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/cn.crt.pem b/libminifi/test/resources/cn.crt.pem
index 3a786db..60a38ac 100644
--- a/libminifi/test/resources/cn.crt.pem
+++ b/libminifi/test/resources/cn.crt.pem
@@ -1,5 +1,4 @@
Bag Attributes
- friendlyName: nifi-key
localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C
subject=/OU=NIFI/CN=test
issuer=/OU=NIFI/CN=localhost
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/resources/nifi-cert.pem
----------------------------------------------------------------------
diff --git a/libminifi/test/resources/nifi-cert.pem b/libminifi/test/resources/nifi-cert.pem
index 4e404cd..0c3b7da 100644
--- a/libminifi/test/resources/nifi-cert.pem
+++ b/libminifi/test/resources/nifi-cert.pem
@@ -18,3 +18,30 @@ lvrRtWOqyGHiRoaRE5+VUjyO+0ToEgj9E+3rV8JL66BT7SWQusLGqbX1OoANCMTj
BRYeqB0g0PrXU+6chh6StpNSnYzkQdoxLUIDYYZx2XGsbkjDh/k6ni6bgJEKEOCu
T3Z2tyvGpc+PjLRXW/WyXCpg/xfr3+GSVKI6ark=
-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAwCF6Tchue7tR66BPg886WOYNPgSwNaq1KJQSuGcEHK2wlAEu
+YfiYz9LbjFLZRLRY2CF9mIGb683byrnvOMcq6a+YdXDaOHZnkKBSsI/xTzScXTv3
+EKSueZ0sMuD7L0y/2Cs2lf8heBUEUqmNe15J9yvEQ1GpJ0j7iCCneKYjjezFWglR
+Sv/9suvqVCxIxr4j9gXODgyU3wdwIxkQUBJXk4GtDp03Rxcx6Ch0VBwjcGkYHhcs
+GHRzg6dcr795tLfOQNA/Vlje0+RtH/KU/WXgzl9nKtxD7XUwZyhoElzNcehN0WmK
+DgAmASncvy7+YYzKU69H14Q+2n/apdoqx/kTQQIDAQABAoIBAQCz7eY69+y4BXo3
+nz84Ipby8CcQoJVg/QiBAwLxHNCWBvdp9B069PQvFLo1FNWSaQ8XAW48p4yc7YHb
+vftRgfwnMyIlQdWrsP9WSz6FSZhkY9HX4rODK6aWD+J3l4jFCCxVxkpteKwgaBZP
+T6hHE8tTJfK8VLqEJu4g0uvjqjt7ydJT69lThdyf3VE0v6ZeSjsya5qqw+9RK+uC
+q5T/8FxeFZgpfR6UXXnoLAmAkfcMZNIBo6cOJWi/BQHjZdpCOVXUBtu0/lC8bffa
+4/ESaxRS8kOp+WEb64pT7u6F7yhD/kve6ZnJj/SX1EvN+RzB3zoVG42WUs/+/SwN
+dU1ERz+tAoGBAPbgZPDnWuKxW7Cam/Aqmvux624C1lNfhfXEGURhyc+wHWjjhWRe
+2vEPJOVxG5pN/FAo+lFoGiLe3QsLRLPlQrGfT/92W28QEcRrRSutjRZOL3wKezQA
+DkAPU9HX3lACR5yQD6+a0HHgMr1MqeNFPi9MPPjywGywTyWzHd4WQqvTAoGBAMc7
+J4fpr5uPVq9mKemK67i7meJ8AxjjU7oNe8EN+2XfCYcQUmgIo+dLzV9+DTrYkoTz
+iqjA6Ph2DNs6YHI/JNwsdSbAz6KVDteimt3t+uyNpiMGuyLmfOgpYEMJcHp+q6I6
+7PGKVS4c5iPFiYuIo23Is9ZMxOVQp76+UOy09rwbAoGBAOM5Za7VQjGkTGAf7ab/
+j+ZZu/dlZR8XrJSoCRmHZ9hgoLEJuJzJMXruFWeY028SmEivbrW+u0+dEJY5qOJr
+ARe7KkZXCZEPmUrP8Lpi4pjFHa9tdjhGVNdhRCTAKz442vCfJ9DZDUHCuPDCvxsP
+gEzIPtZjl/hxzmdElRj0JClBAoGAaXmfzAyjs6+HLQThW4r4kKyBI66T1TFEulM5
+GVPVrHEQEjlJ51nrrCAtckjBqE3QBCMLXZwDusaEt+uH8/QKB6Zhv0qEooZXfUHQ
+y32aQnIbap+9oxRzPFXraJIuwisdop2fo6Cgx/D0xitmTkDghNaknue1tdGlfQ40
+uZx0o9ECgYBeKeNbMnWoO46ZOrhaz8On+fIY7xtboV2bALy7lvUbWd9B41ntqYUm
+NHlYXDDU+Izs5wnNJnNnx4vECuUzYbpeY82dvMewlQwfl5aiyKrjo7VxLm//2U/K
+hlID6DU5wi9O+TAQ319DhxT7Ja+AQxO/OFS/mfrtwJEevxXqJLu55Q==
+-----END RSA PRIVATE KEY-----
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/FileStreamTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
new file mode 100644
index 0000000..5c86f19
--- /dev/null
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -0,0 +1,210 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
+#include "io/FileStream.h"
+#include <string>
+#include <vector>
+#include <uuid/uuid.h>
+#include "../TestBase.h"
+
+TEST_CASE("TestFileOverWrite", "[TestFiles]") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::io::FileStream stream(path, 0, true);
+ std::vector<uint8_t> readBuffer;
+ REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+ uint8_t* data = readBuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+ stream.seek(4);
+
+ stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 4);
+
+ stream.seek(0);
+
+ std::vector<uint8_t> verifybuffer;
+
+ REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+ data = verifybuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempfile");
+
+ unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::io::FileStream stream(path, 0, true);
+ std::vector<uint8_t> readBuffer;
+ REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+ uint8_t* data = readBuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+ stream.seek(4);
+
+ stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 0);
+
+ stream.seek(0);
+
+ std::vector<uint8_t> verifybuffer;
+
+ REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+ data = verifybuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+ unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::io::FileStream stream(path, 0, true);
+ std::vector<uint8_t> readBuffer;
+ REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+ uint8_t* data = readBuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+ stream.seek(4);
+
+ stream.write(nullptr, 0);
+
+ stream.seek(0);
+
+ std::vector<uint8_t> verifybuffer;
+
+ REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize());
+
+ data = verifybuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+ unlink(ss.str().c_str());
+}
+
+TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::io::FileStream stream(path, 0, true);
+ std::vector<uint8_t> readBuffer;
+ REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+ uint8_t* data = readBuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+ stream.seek(4);
+
+ stream.write(nullptr, 0);
+
+ stream.seek(0);
+
+ std::vector<uint8_t> verifybuffer;
+
+ REQUIRE(stream.readData(nullptr, stream.getSize()) == -1);
+
+ data = verifybuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+
+ unlink(ss.str().c_str());
+}
+
+
+TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
+ TestController testController;
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ std::string path = ss.str();
+ file.open(path, std::ios::out);
+ file << "tempFile";
+ file.close();
+
+ minifi::io::FileStream stream(path, 0, true);
+ std::vector<uint8_t> readBuffer;
+ REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize());
+
+ uint8_t* data = readBuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile");
+
+ stream.seek(0);
+
+ std::vector<uint8_t> verifybuffer;
+
+ REQUIRE(stream.readData(verifybuffer, 8192) == 8);
+
+ data = verifybuffer.data();
+
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile");
+
+ unlink(ss.str().c_str());
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/InvokeHTTPTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp
index 705ac84..2ef3c17 100644
--- a/libminifi/test/unit/InvokeHTTPTests.cpp
+++ b/libminifi/test/unit/InvokeHTTPTests.cpp
@@ -25,6 +25,7 @@
#include <string>
#include <set>
#include "FlowController.h"
+#include "io/BaseStream.h"
#include "../TestBase.h"
#include "processors/GetFile.h"
#include "core/Core.h"
@@ -35,105 +36,9 @@
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
-TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
-
- std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- uuid_t invokehttp_uuid;
- REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
-
- connection2->setRelationship(core::Relationship("No Retry", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(invokehttp);
-
- connection2->setSource(invokehttp);
-
- connection2->setSourceUUID(invokehttp_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(invokehttp_uuid);
-
- processor->addConnection(connection);
- invokehttp->addConnection(connection);
- invokehttp->addConnection(connection2);
-
- core::ProcessorNode node(processor);
- core::ProcessorNode node2(invokehttp);
-
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- core::ProcessContext context2(node2, controller_services_provider, repo);
- context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8685");
- context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
-
- context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
- context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8685/testytesttest");
- core::ProcessSession session(&context);
- core::ProcessSession session2(&context2);
-
- REQUIRE(processor->getName() == "listenhttp");
-
- core::ProcessSessionFactory factory(&context);
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- core::ProcessSessionFactory factory2(&context2);
- invokehttp->onSchedule(&context2, &factory2);
- invokehttp->onTrigger(&context2, &session2);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
-
- reporter = session.getProvenanceReporter();
-
- records = reporter->getEvents();
- session.commit();
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(&context2, &session2);
-
- session2.commit();
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == processor->getName());
- }
- std::shared_ptr<core::FlowFile> ffr = session2.get();
- REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
- LogTestController::getInstance().reset();
-}
-
TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
TestController testController;
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
@@ -154,16 +59,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
uuid_t invokehttp_uuid;
REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+ std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
gcConnection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute");
+ std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
laConnection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
connection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
+ std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
connection2->setRelationship(core::Relationship("No Retry", "description"));
@@ -181,8 +86,8 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
core::ProcessorNode node(listenhttp);
core::ProcessorNode node2(invokehttp);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- core::ProcessContext context2(node2, controller_services_provider, repo);
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo);
context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
@@ -242,9 +147,10 @@ class CallBack : public minifi::OutputStreamCallback {
}
virtual ~CallBack() {
}
- virtual void process(std::ofstream *stream) {
+ virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+ // leaving the typo for posterity sake
std::string st = "we're gnna write some test stuff";
- stream->write(st.c_str(), st.length());
+ return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length());
}
};
@@ -270,16 +176,18 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
uuid_t invokehttp_uuid;
REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+ std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
gcConnection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute");
+ std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
laConnection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
connection->setRelationship(core::Relationship("success", "description"));
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp");
+ std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
connection2->setRelationship(core::Relationship("No Retry", "description"));
@@ -299,8 +207,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
core::ProcessorNode node(invokehttp);
core::ProcessorNode node2(listenhttp);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- core::ProcessContext context2(node2, controller_services_provider, repo);
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo);
context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680");
context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
@@ -317,14 +225,9 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
CallBack callback;
- /*
- explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository,
- std::map<std::string, std::string> attributes,
- std::shared_ptr<ResourceClaim> claim = nullptr);
- */
std::map<std::string, std::string> attributes;
attributes["testy"] = "test";
- std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, attributes);
+ std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes);
session2.write(flow, &callback);
invokehttp->incrementActiveTasks();
@@ -368,3 +271,39 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
LogTestController::getInstance().reset();
}
+TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>();
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>();
+ LogTestController::getInstance().setInfo<core::Processor>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false);
+ std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+ REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
+ REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest"));
+
+ REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
+ REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest"));
+ plan->reset();
+ testController.runSession(plan, true);
+
+ std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+ std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
+
+ plan->reset();
+ testController.runSession(plan, true);
+
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
+
+ for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+ REQUIRE(provEventRecord->getComponentType() == processor->getName());
+ }
+ std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
+ REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST"));
+ LogTestController::getInstance().reset();
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp
index 9e2d50c..7f34ba4 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -23,11 +23,12 @@
#include <vector>
#include <set>
#include <fstream>
-#include "../unit/ProvenanceTestHelper.h"
+
#include "../TestBase.h"
#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
#include "processors/GetFile.h"
+#include "../unit/ProvenanceTestHelper.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
@@ -42,131 +43,12 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
REQUIRE(processor->getName() == "processorname");
}
-TEST_CASE("Test Find file", "[getfileCreate2]") {
- TestController testController;
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
- std::shared_ptr<org::apache::nifi::minifi::Configure> configure = std::make_shared<org::apache::nifi::minifi::Configure>();
-
- std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
- std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(configure), configure);
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- char format[] = "/tmp/gt.XXXXXX";
- char *dir = testController.createTempDirectory(format);
-
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
- connection->setDestination(processor);
-
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(processoruuid);
-
- processor->addConnection(connection);
- REQUIRE(dir != NULL);
-
- core::ProcessorNode node(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, test_repo);
- core::ProcessSessionFactory factory(&context);
- context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
- core::ProcessSession session(&context);
-
- processor->onSchedule(&context, &factory);
- REQUIRE(processor->getName() == "getfileCreate2");
-
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
-
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- REQUIRE(records.size() == 0);
-
- std::fstream file;
- std::stringstream ss;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
-
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
- unlink(ss.str().c_str());
- reporter = session.getProvenanceReporter();
-
- REQUIRE(processor->getName() == "getfileCreate2");
-
- records = reporter->getEvents();
-
- for (provenance::ProvenanceEventRecord *provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == processor->getName());
- }
- session.commit();
- std::shared_ptr<core::FlowFile> ffr = session.get();
-
- ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- REQUIRE(2 == repo->getRepoMap().size());
-
- for (auto entry : repo->getRepoMap()) {
- provenance::ProvenanceEventRecord newRecord;
- newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length());
-
- bool found = false;
- for (auto provRec : records) {
- if (provRec->getEventId() == newRecord.getEventId()) {
- REQUIRE(provRec->getEventId() == newRecord.getEventId());
- REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
- REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
- REQUIRE(provRec->getDetails() == newRecord.getDetails());
- REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
- found = true;
- break;
- }
- }
- if (!found) {
- throw std::runtime_error("Did not find record");
- }
- }
-
- core::ProcessorNode nodeReport(processorReport);
- core::ProcessContext contextReport(nodeReport, controller_services_provider, test_repo);
- core::ProcessSessionFactory factoryReport(&contextReport);
- core::ProcessSession sessionReport(&contextReport);
- processorReport->onSchedule(&contextReport, &factoryReport);
- std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast<
- org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport);
- taskReport->setBatchSize(1);
- std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport;
- processorReport->incrementActiveTasks();
- processorReport->setScheduledState(core::ScheduledState::RUNNING);
- std::string jsonStr;
- repo->getProvenanceRecord(recordsReport, 1);
- taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr);
- REQUIRE(recordsReport.size() == 1);
- REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
- REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
-}
-
TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
TestController testController;
-
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
char format[] = "/tmp/gt.XXXXXX";
@@ -175,7 +57,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
uuid_t processoruuid;
REQUIRE(true == processor->getUUID(processoruuid));
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection");
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
+
connection->setRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
@@ -190,7 +73,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
core::ProcessorNode node(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, test_repo);
+ core::ProcessContext context(node, controller_services_provider, test_repo, test_repo);
core::ProcessSessionFactory factory(&context);
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
// replicate 10 threads
@@ -245,71 +128,59 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
- std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
char *dir = testController.createTempDirectory(format);
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
-
- uuid_t logattribute_uuid;
- REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection");
- connection->setRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "logattribute");
- connection2->setRelationship(core::Relationship("success", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(processor);
-
- // link the connections so that we can test results at the end for this
- connection->setDestination(logAttribute);
-
- connection2->setSource(logAttribute);
-
- connection2->setSourceUUID(logattribute_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(logattribute_uuid);
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ testController.runSession(plan, false);
+ std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+ std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
+ REQUIRE(record == nullptr);
+ REQUIRE(records.size() == 0);
- processor->addConnection(connection);
- logAttribute->addConnection(connection);
- logAttribute->addConnection(connection2);
- REQUIRE(dir != NULL);
+ std::fstream file;
+ std::stringstream ss;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ plan->reset();
+ testController.runSession(plan, false);
- core::ProcessorNode node(processor);
- core::ProcessorNode node2(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- core::ProcessContext context2(node2, controller_services_provider, repo);
- context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
- core::ProcessSession session(&context);
- core::ProcessSession session2(&context2);
+ unlink(ss.str().c_str());
- REQUIRE(processor->getName() == "getfileCreate2");
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
+ testController.runSession(plan, false);
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
- core::ProcessSessionFactory factory(&context);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
+ REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
+ REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
+ REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
+ LogTestController::getInstance().reset();
+}
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- core::ProcessSessionFactory factory2(&context2);
- logAttribute->onSchedule(&context2, &factory2);
- logAttribute->onTrigger(&context2, &session2);
+TEST_CASE("Test Find file", "[getfileCreate3]") {
+ TestController testController;
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2");
+ std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
+ std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>());
+ plan->addProcessor(processorReport, "reporter", core::Relationship("success", "description"), false);
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents();
- record = session.get();
+ plan->setProperty(processor, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
+ testController.runSession(plan, false);
+ std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords();
+ std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
@@ -319,26 +190,58 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
+ plan->reset();
+ testController.runSession(plan, false);
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onTrigger(&context, &session);
- unlink(ss.str().c_str());
- reporter = session.getProvenanceReporter();
-
- records = reporter->getEvents();
- session.commit();
-
- logAttribute->incrementActiveTasks();
- logAttribute->setScheduledState(core::ScheduledState::RUNNING);
- logAttribute->onTrigger(&context2, &session2);
+ records = plan->getProvenanceRecords();
+ record = plan->getCurrentFlowFile();
+ for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+ REQUIRE(provEventRecord->getComponentType() == processor->getName());
+ }
+ std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
+ REQUIRE(ffr != nullptr);
+ ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+ auto repo = std::static_pointer_cast<TestRepository>(plan->getProvenanceRepo());
+ REQUIRE(2 == repo->getRepoMap().size());
- records = reporter->getEvents();
+ for (auto entry : repo->getRepoMap()) {
+ provenance::ProvenanceEventRecord newRecord;
+ newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length());
- REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
- REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
- REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
- LogTestController::getInstance().reset();
+ bool found = false;
+ for (auto provRec : records) {
+ if (provRec->getEventId() == newRecord.getEventId()) {
+ REQUIRE(provRec->getEventId() == newRecord.getEventId());
+ REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
+ REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
+ REQUIRE(provRec->getDetails() == newRecord.getDetails());
+ REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw std::runtime_error("Did not find record");
+ }
+ }
+ std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast<
+ org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport);
+ taskReport->setBatchSize(1);
+ std::vector<std::shared_ptr<core::SerializableComponent>> recordsReport;
+ recordsReport.push_back(std::make_shared<provenance::ProvenanceEventRecord>());
+ processorReport->incrementActiveTasks();
+ processorReport->setScheduledState(core::ScheduledState::RUNNING);
+ std::string jsonStr;
+ std::size_t deserialized = 0;
+ repo->DeSerialize(recordsReport, deserialized);
+ std::function<void(core::ProcessContext*, core::ProcessSession*)> verifyReporter = [&](core::ProcessContext *context, core::ProcessSession *session) {
+ taskReport->getJsonReport(context, session, recordsReport, jsonStr);
+ REQUIRE(recordsReport.size() == 1);
+ REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
+ REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos);
+ };
+
+ testController.runSession(plan, false, verifyReporter);
}
int fileSize(const char *add) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 17e6078..1b39700 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -18,11 +18,22 @@
#ifndef LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_
#define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_
-#include "provenance/Provenance.h"
-#include "FlowController.h"
-#include "core/Repository.h"
-#include "core/repository/FlowFileRepository.h"
-#include "core/Core.h"
+#include <atomic>
+#include <cstdint>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include "core/repository/VolatileContentRepository.h"
+#include "../../include/core/Processor.h"
+#include "../../include/core/repository/FlowFileRepository.h"
+#include "../../include/Connection.h"
+#include "../../include/FlowController.h"
+#include "../../include/properties/Configure.h"
+#include "../../include/provenance/Provenance.h"
+
/**
* Test repository
*/
@@ -41,17 +52,22 @@ class TestRepository : public core::Repository {
}
- bool Put(std::string key, uint8_t *buf, int bufLen) {
+ bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen)));
return true;
}
+
+ virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+ return Put(key, buffer, bufferSize);
+ }
+
// Delete
bool Delete(std::string key) {
repositoryResults.erase(key);
return true;
}
// Get
- bool Get(std::string key, std::string &value) {
+ bool Get(const std::string &key, std::string &value) {
auto result = repositoryResults.find(key);
if (result != repositoryResults.end()) {
value = result->second;
@@ -61,6 +77,39 @@ class TestRepository : public core::Repository {
}
}
+ virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) {
+ return false;
+ }
+
+ virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+ max_size = 0;
+ for (auto entry : repositoryResults) {
+ std::shared_ptr<core::SerializableComponent> eventRead = store.at(max_size);
+
+ if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) {
+ }
+ if (+max_size >= store.size()) {
+ break;
+ }
+ }
+ return true;
+ }
+
+ virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) {
+ return false;
+ }
+
+ virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
+ std::string value;
+ Get(store->getUUIDStr(), value);
+ store->DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(value.c_str())), value.size());
+ return true;
+ }
+
+ virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
+ return false;
+ }
+
const std::map<std::string, std::string> &getRepoMap() const {
return repositoryResults;
}
@@ -134,6 +183,9 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
}
}
}
+
+ void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
+ }
void run() {
// do nothing
@@ -145,8 +197,8 @@ class TestFlowRepository : public core::repository::FlowFileRepository {
class TestFlowController : public minifi::FlowController {
public:
- TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo)
- : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, "", true) {
+ TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo)
+ : minifi::FlowController(repo, flow_file_repo,std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) {
}
~TestFlowController() {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp
index 6a58597..97cb646 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -26,8 +26,8 @@
#include "provenance/Provenance.h"
#include "FlowFileRecord.h"
#include "core/Core.h"
-#include "core/repository/FlowFileRepository.h"
-#include "core/repository/VolatileRepository.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/repository/VolatileProvenanceRepository.h"
TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
@@ -49,7 +49,8 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+ record2.setEventId(eventId);
+ REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record2.getEventId() == record1.getEventId());
REQUIRE(record2.getComponentId() == record1.getComponentId());
REQUIRE(record2.getComponentType() == record1.getComponentType());
@@ -60,12 +61,13 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::string eventId = record1.getEventId();
std::map<std::string, std::string> attributes;
attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("ff", "./content_repository", 0, 0, 0);
- std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes);
+ std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
record1.addChildFlowFile(ffr1);
@@ -75,7 +77,8 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+ record2.setEventId(eventId);
+ REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record1.getChildrenUuids().size() == 1);
REQUIRE(record2.getChildrenUuids().size() == 1);
std::string childId = record2.getChildrenUuids().at(0);
@@ -94,13 +97,14 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
uint64_t sample = 65555;
- std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>();
+ std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
testRepository->initialize(0);
record1.setEventDuration(sample);
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+ record2.setEventId(eventId);
+ REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record2.getEventId() == record1.getEventId());
REQUIRE(record2.getComponentId() == record1.getComponentId());
REQUIRE(record2.getComponentType() == record1.getComponentType());
@@ -111,24 +115,26 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::string eventId = record1.getEventId();
std::map<std::string, std::string> attributes;
attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
- std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileRepository>();
+ std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileProvenanceRepository>();
frepo->initialize(0);
- std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes);
+ std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes);
record1.addChildFlowFile(ffr1);
uint64_t sample = 65555;
- std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>();
+ std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
testRepository->initialize(0);
record1.setEventDuration(sample);
record1.Serialize(testRepository);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+ record2.setEventId(eventId);
+ REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record1.getChildrenUuids().size() == 1);
REQUIRE(record2.getChildrenUuids().size() == 1);
std::string childId = record2.getChildrenUuids().at(0);
@@ -151,7 +157,8 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena
testRepository->initialize(0);
record1.setEventDuration(sample);
- record1.Serialize(testRepository);
+ REQUIRE(record1.Serialize(testRepository) == true);
provenance::ProvenanceEventRecord record2;
- REQUIRE(record2.DeSerialize(testRepository, eventId) == false);
+ record2.setEventId(eventId);
+ REQUIRE(record2.DeSerialize(testRepository) == false);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp
index 4424a93..3b18310 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -23,7 +23,7 @@
#include "provenance/Provenance.h"
#include "FlowFileRecord.h"
#include "core/Core.h"
-#include "core/repository/FlowFileRepository.h"
+#include "../../include/core/repository/AtomicRepoEntries.h"
#include "properties/Configure.h"
TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
@@ -34,7 +34,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
repository->initialize(std::make_shared<minifi::Configure>());
- minifi::FlowFileRecord record(repository);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ minifi::FlowFileRecord record(repository, content_repo);
record.addAttribute("keyA", "");
@@ -50,8 +51,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
repository->initialize(std::make_shared<minifi::Configure>());
-
- minifi::FlowFileRecord record(repository);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ minifi::FlowFileRecord record(repository, content_repo);
record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
@@ -70,9 +71,10 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- minifi::FlowFileRecord record(repository);
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ minifi::FlowFileRecord record(repository, content_repo);
- minifi::FlowFileRecord record2(repository);
+ minifi::FlowFileRecord record2(repository, content_repo);
std::string uuid = record.getUUIDStr();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7c16d1bb/libminifi/test/unit/TailFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp
index e800b4c..eb33f8c 100644
--- a/libminifi/test/unit/TailFileTests.cpp
+++ b/libminifi/test/unit/TailFileTests.cpp
@@ -42,130 +42,137 @@ static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+ TestController testController;
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- uuid_t logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
- connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("success", "TailFile successful output"));
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
+ connection->setSourceUUID(processoruuid);
- processor->addConnection(connection);
+ processor->addConnection(connection);
- core::ProcessorNode node(processor);
+ core::ProcessorNode node(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
- core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
- REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
- core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
- LogTestController::getInstance().reset();
- } catch (...) { }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
}
-
TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- uuid_t processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- uuid_t logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ uuid_t processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ uuid_t logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection");
- connection->setRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->setRelationship(core::Relationship("success", "TailFile successful output"));
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
+ connection->setSourceUUID(processoruuid);
- processor->addConnection(connection);
+ processor->addConnection(connection);
- core::ProcessorNode node(processor);
+ core::ProcessorNode node(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
- core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
- REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
- core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 2);
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 2);
- LogTestController::getInstance().reset();
- } catch (...) { }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
}