You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/10/19 12:13:09 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1458 Register the
hidden properties in InvokeHTTP
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 293eba7 MINIFICPP-1458 Register the hidden properties in InvokeHTTP
293eba7 is described below
commit 293eba74c392d7f33579b92edef4ba02c5c7e068
Author: aminadinari19 <am...@gmail.com>
AuthorDate: Tue Oct 19 14:08:04 2021 +0200
MINIFICPP-1458 Register the hidden properties in InvokeHTTP
Closes #1200
Signed-off-by: Marton Szasz <sz...@apache.org>
Co-authored-by: Ferenc Gerlits <fg...@gmail.com>
---
extensions/http-curl/processors/InvokeHTTP.cpp | 5 +
.../http-curl/tests/unit/InvokeHTTPTests.cpp | 198 +++++++++++----------
2 files changed, 105 insertions(+), 98 deletions(-)
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 564e0ee..5b3d881 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -163,6 +163,8 @@ void InvokeHTTP::initialize() {
properties.insert(DisablePeerVerification);
properties.insert(AlwaysOutputResponse);
properties.insert(FollowRedirects);
+ properties.insert(PropPutOutputAttributes);
+ properties.insert(PenalizeOnNoRetry);
setSupportedProperties(properties);
// Set the supported relationships
@@ -364,6 +366,9 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
logger_->log_trace("InvokeHTTP -- curl successful");
bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
+ if (putToAttribute) {
+ logger_->log_debug("Adding http response body to flow file attribute %s", put_attribute_name_);
+ }
const std::vector<char> &response_body = client.getResponseBody();
const std::vector<std::string> &response_headers = client.getHeaders();
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index f817ca1..41e9ce6 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -21,7 +21,6 @@
#include <string>
#include "io/BaseStream.h"
#include "TestBase.h"
-#include "processors/GetFile.h"
#include "core/Core.h"
#include "HTTPClient.h"
#include "InvokeHTTP.h"
@@ -34,92 +33,63 @@
#include "core/ProcessorNode.h"
#include "processors/LogAttribute.h"
#include "utils/gsl.h"
+#include "processors/GenerateFlowFile.h"
+
+namespace {
+class TestHTTPServer {
+ public:
+ TestHTTPServer();
+ static constexpr const char* PROCESSOR_NAME = "my_http_server";
+ static constexpr const char* URL = "http://localhost:8681/testytesttest";
+
+ private:
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
+};
+
+TestHTTPServer::TestHTTPServer() {
+ std::shared_ptr<core::Processor> listen_http = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
+ test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest");
+ test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+ test_controller_.runSession(test_plan_);
+}
+} // namespace
TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+ TestHTTPServer http_server;
+
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
- std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
- listenhttp->initialize();
-
std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
invokehttp->initialize();
- utils::Identifier processoruuid = listenhttp->getUUID();
- REQUIRE(processoruuid);
-
utils::Identifier invokehttp_uuid = invokehttp->getUUID();
REQUIRE(invokehttp_uuid);
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- gcConnection->addRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
- laConnection->addRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- connection->addRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
-
- connection2->addRelationship(core::Relationship("No Retry", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(listenhttp);
-
- connection2->setSourceUUID(invokehttp_uuid);
- connection->setSourceUUID(processoruuid);
- connection->setDestinationUUID(invokehttp_uuid);
-
- listenhttp->addConnection(connection);
- invokehttp->addConnection(connection);
- invokehttp->addConnection(connection2);
-
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
- std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(invokehttp);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
- context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
- context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
- REQUIRE(listenhttp->getName() == "listenhttp");
-
- std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+ context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
+ context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, TestHTTPServer::URL);
- std::shared_ptr<core::FlowFile> record;
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onSchedule(context, factory);
- listenhttp->onTrigger(context, session);
+ auto session = std::make_shared<core::ProcessSession>(context);
invokehttp->incrementActiveTasks();
invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
- invokehttp->onSchedule(context2, factory2);
- invokehttp->onTrigger(context2, session2);
+ std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context);
+ invokehttp->onSchedule(context, factory2);
+ invokehttp->onTrigger(context, session);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
- record = session->get();
+ std::shared_ptr<core::FlowFile> record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onTrigger(context, session);
-
reporter = session->getProvenanceReporter();
records = reporter->getEvents();
@@ -127,17 +97,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
invokehttp->incrementActiveTasks();
invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(context2, session2);
+ invokehttp->onTrigger(context, session);
- session2->commit();
+ session->commit();
records = reporter->getEvents();
-
+ // FIXME(fgerlits): this test is very weak, as `records` is empty
for (auto provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+ REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME);
}
- REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
- LogTestController::getInstance().reset();
+ REQUIRE(LogTestController::getInstance().contains("Exiting because method is POST"));
}
class CallBack : public minifi::OutputStreamCallback {
@@ -156,15 +125,11 @@ class CallBack : public minifi::OutputStreamCallback {
TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
- std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
listenhttp->initialize();
@@ -181,12 +146,6 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
- std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
- gcConnection->addRelationship(core::Relationship("success", "description"));
-
- std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
- laConnection->addRelationship(core::Relationship("success", "description"));
-
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
connection->addRelationship(core::Relationship("success", "description"));
@@ -260,34 +219,26 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
session2->commit();
records = reporter->getEvents();
-
+ // FIXME(fgerlits): this test is very weak, as `records` is empty
for (auto provEventRecord : records) {
REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
}
REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
- LogTestController::getInstance().reset();
}
TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+ TestHTTPServer http_server;
+
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
- LogTestController::getInstance().setInfo<core::Processor>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<core::Processor> listenhttp = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false);
- listenhttp->initialize();
- std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
- invokehttp->initialize();
+ std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp");
- REQUIRE(true == plan->setProperty(listenhttp, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
- REQUIRE(true == plan->setProperty(listenhttp, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest"));
-
- REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
- REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest"));
- plan->reset();
- testController.runSession(plan, true);
+ plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST");
+ plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+ testController.runSession(plan);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
@@ -295,14 +246,65 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
REQUIRE(records.size() == 0);
plan->reset();
- testController.runSession(plan, true);
+ testController.runSession(plan);
records = plan->getProvenanceRecords();
-
+ // FIXME(fgerlits): this test is very weak, as `records` is empty
for (auto provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+ REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME);
}
REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
- LogTestController::getInstance().reset();
+}
+
+TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
+ using processors::InvokeHTTP;
+
+ TestController testController;
+ TestHTTPServer http_server;
+
+ LogTestController::getInstance().setInfo<minifi::core::ProcessSession>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+ std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+ plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
+ plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+ invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+
+ constexpr const char* PENALIZE_LOG_PATTERN = "Penalizing [0-9a-f-]+ for [0-9]+ms at invokehttp";
+
+ SECTION("with penalize on no retry set to true") {
+ plan->setProperty(invokehttp, InvokeHTTP::PenalizeOnNoRetry.getName(), "true");
+ testController.runSession(plan);
+ REQUIRE(LogTestController::getInstance().matchesRegex(PENALIZE_LOG_PATTERN));
+ }
+
+ SECTION("with penalize on no retry set to false") {
+ plan->setProperty(invokehttp, InvokeHTTP::PenalizeOnNoRetry.getName(), "false");
+ testController.runSession(plan);
+ REQUIRE_FALSE(LogTestController::getInstance().matchesRegex(PENALIZE_LOG_PATTERN));
+ }
+}
+
+TEST_CASE("HTTPTestsPutResponseBodyinAttribute", "[httptest1]") {
+ using processors::InvokeHTTP;
+
+ TestController testController;
+ TestHTTPServer http_server;
+
+ LogTestController::getInstance().setDebug<InvokeHTTP>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+ std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+ plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
+ plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+ plan->setProperty(invokehttp, InvokeHTTP::PropPutOutputAttributes.getName(), "http.type");
+ invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Adding http response body to flow file attribute http.type"));
}