You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/10/19 12:13:09 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1458 Register the hidden properties in InvokeHTTP

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 293eba7  MINIFICPP-1458 Register the hidden properties in InvokeHTTP
293eba7 is described below

commit 293eba74c392d7f33579b92edef4ba02c5c7e068
Author: aminadinari19 <am...@gmail.com>
AuthorDate: Tue Oct 19 14:08:04 2021 +0200

    MINIFICPP-1458 Register the hidden properties in InvokeHTTP
    
    Closes #1200
    Signed-off-by: Marton Szasz <sz...@apache.org>
    Co-authored-by: Ferenc Gerlits <fg...@gmail.com>
---
 extensions/http-curl/processors/InvokeHTTP.cpp     |   5 +
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       | 198 +++++++++++----------
 2 files changed, 105 insertions(+), 98 deletions(-)

diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 564e0ee..5b3d881 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -163,6 +163,8 @@ void InvokeHTTP::initialize() {
   properties.insert(DisablePeerVerification);
   properties.insert(AlwaysOutputResponse);
   properties.insert(FollowRedirects);
+  properties.insert(PropPutOutputAttributes);
+  properties.insert(PenalizeOnNoRetry);
 
   setSupportedProperties(properties);
   // Set the supported relationships
@@ -364,6 +366,9 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
     logger_->log_trace("InvokeHTTP -- curl successful");
 
     bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
+    if (putToAttribute) {
+      logger_->log_debug("Adding http response body to flow file attribute %s", put_attribute_name_);
+    }
 
     const std::vector<char> &response_body = client.getResponseBody();
     const std::vector<std::string> &response_headers = client.getHeaders();
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index f817ca1..41e9ce6 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -21,7 +21,6 @@
 #include <string>
 #include "io/BaseStream.h"
 #include "TestBase.h"
-#include "processors/GetFile.h"
 #include "core/Core.h"
 #include "HTTPClient.h"
 #include "InvokeHTTP.h"
@@ -34,92 +33,63 @@
 #include "core/ProcessorNode.h"
 #include "processors/LogAttribute.h"
 #include "utils/gsl.h"
+#include "processors/GenerateFlowFile.h"
+
+namespace {
+class TestHTTPServer {
+ public:
+  TestHTTPServer();
+  static constexpr const char* PROCESSOR_NAME = "my_http_server";
+  static constexpr const char* URL = "http://localhost:8681/testytesttest";
+
+ private:
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
+};
+
+TestHTTPServer::TestHTTPServer() {
+  std::shared_ptr<core::Processor> listen_http = test_plan_->addProcessor("ListenHTTP", PROCESSOR_NAME);
+  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest");
+  test_plan_->setProperty(listen_http, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8681");
+  test_controller_.runSession(test_plan_);
+}
+}  // namespace
 
 TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
   TestController testController;
-  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+  TestHTTPServer http_server;
+
   LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
 
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-  std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
-  std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
-  listenhttp->initialize();
-
   std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
   invokehttp->initialize();
 
-  utils::Identifier processoruuid = listenhttp->getUUID();
-  REQUIRE(processoruuid);
-
   utils::Identifier invokehttp_uuid = invokehttp->getUUID();
   REQUIRE(invokehttp_uuid);
 
-  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
-  gcConnection->addRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
-  laConnection->addRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
-  connection->addRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp");
-
-  connection2->addRelationship(core::Relationship("No Retry", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(listenhttp);
-
-  connection2->setSourceUUID(invokehttp_uuid);
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(invokehttp_uuid);
-
-  listenhttp->addConnection(connection);
-  invokehttp->addConnection(connection);
-  invokehttp->addConnection(connection2);
-
-  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
-  std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
+  std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(invokehttp);
   std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
-  std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
-  context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
-  context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
 
-  context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
-  context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest");
-  auto session = std::make_shared<core::ProcessSession>(context);
-  auto session2 = std::make_shared<core::ProcessSession>(context2);
-
-  REQUIRE(listenhttp->getName() == "listenhttp");
-
-  std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
+  context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
+  context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, TestHTTPServer::URL);
 
-  std::shared_ptr<core::FlowFile> record;
-  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
-  listenhttp->onSchedule(context, factory);
-  listenhttp->onTrigger(context, session);
+  auto session = std::make_shared<core::ProcessSession>(context);
 
   invokehttp->incrementActiveTasks();
   invokehttp->setScheduledState(core::ScheduledState::RUNNING);
-  std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
-  invokehttp->onSchedule(context2, factory2);
-  invokehttp->onTrigger(context2, session2);
+  std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context);
+  invokehttp->onSchedule(context, factory2);
+  invokehttp->onTrigger(context, session);
 
   auto reporter = session->getProvenanceReporter();
   auto records = reporter->getEvents();
-  record = session->get();
+  std::shared_ptr<core::FlowFile> record = session->get();
   REQUIRE(record == nullptr);
   REQUIRE(records.size() == 0);
 
-  listenhttp->incrementActiveTasks();
-  listenhttp->setScheduledState(core::ScheduledState::RUNNING);
-  listenhttp->onTrigger(context, session);
-
   reporter = session->getProvenanceReporter();
 
   records = reporter->getEvents();
@@ -127,17 +97,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
 
   invokehttp->incrementActiveTasks();
   invokehttp->setScheduledState(core::ScheduledState::RUNNING);
-  invokehttp->onTrigger(context2, session2);
+  invokehttp->onTrigger(context, session);
 
-  session2->commit();
+  session->commit();
   records = reporter->getEvents();
-
+  // FIXME(fgerlits): this test is very weak, as `records` is empty
   for (auto provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+    REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME);
   }
 
-  REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
-  LogTestController::getInstance().reset();
+  REQUIRE(LogTestController::getInstance().contains("Exiting because method is POST"));
 }
 
 class CallBack : public minifi::OutputStreamCallback {
@@ -156,15 +125,11 @@ class CallBack : public minifi::OutputStreamCallback {
 
 TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   TestController testController;
-  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+
   LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
 
   std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-  std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
-
-  std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
-
   std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
   listenhttp->initialize();
 
@@ -181,12 +146,6 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
   content_repo->initialize(configuration);
 
-  std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
-  gcConnection->addRelationship(core::Relationship("success", "description"));
-
-  std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
-  laConnection->addRelationship(core::Relationship("success", "description"));
-
   std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
   connection->addRelationship(core::Relationship("success", "description"));
 
@@ -260,34 +219,26 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
 
   session2->commit();
   records = reporter->getEvents();
-
+  // FIXME(fgerlits): this test is very weak, as `records` is empty
   for (auto provEventRecord : records) {
     REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
   }
 
   REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
-  LogTestController::getInstance().reset();
 }
 
 TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   TestController testController;
-  LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+  TestHTTPServer http_server;
+
   LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
-  LogTestController::getInstance().setInfo<core::Processor>();
 
   std::shared_ptr<TestPlan> plan = testController.createPlan();
-  std::shared_ptr<core::Processor> listenhttp = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false);
-  listenhttp->initialize();
-  std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
-  invokehttp->initialize();
+  std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp");
 
-  REQUIRE(true == plan->setProperty(listenhttp, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685"));
-  REQUIRE(true == plan->setProperty(listenhttp, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest"));
-
-  REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST"));
-  REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest"));
-  plan->reset();
-  testController.runSession(plan, true);
+  plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST");
+  plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+  testController.runSession(plan);
 
   auto records = plan->getProvenanceRecords();
   std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
@@ -295,14 +246,65 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
   REQUIRE(records.size() == 0);
 
   plan->reset();
-  testController.runSession(plan, true);
+  testController.runSession(plan);
 
   records = plan->getProvenanceRecords();
-
+  // FIXME(fgerlits): this test is very weak, as `records` is empty
   for (auto provEventRecord : records) {
-    REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
+    REQUIRE(provEventRecord->getComponentType() == TestHTTPServer::PROCESSOR_NAME);
   }
 
   REQUIRE(true == LogTestController::getInstance().contains("Exiting because method is POST"));
-  LogTestController::getInstance().reset();
+}
+
+TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
+  using processors::InvokeHTTP;
+
+  TestController testController;
+  TestHTTPServer http_server;
+
+  LogTestController::getInstance().setInfo<minifi::core::ProcessSession>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+  std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+  plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
+  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+
+  constexpr const char* PENALIZE_LOG_PATTERN = "Penalizing [0-9a-f-]+ for [0-9]+ms at invokehttp";
+
+  SECTION("with penalize on no retry set to true") {
+    plan->setProperty(invokehttp, InvokeHTTP::PenalizeOnNoRetry.getName(), "true");
+    testController.runSession(plan);
+    REQUIRE(LogTestController::getInstance().matchesRegex(PENALIZE_LOG_PATTERN));
+  }
+
+  SECTION("with penalize on no retry set to false") {
+    plan->setProperty(invokehttp, InvokeHTTP::PenalizeOnNoRetry.getName(), "false");
+    testController.runSession(plan);
+    REQUIRE_FALSE(LogTestController::getInstance().matchesRegex(PENALIZE_LOG_PATTERN));
+  }
+}
+
+TEST_CASE("HTTPTestsPutResponseBodyinAttribute", "[httptest1]") {
+  using processors::InvokeHTTP;
+
+  TestController testController;
+  TestHTTPServer http_server;
+
+  LogTestController::getInstance().setDebug<InvokeHTTP>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> genfile = plan->addProcessor("GenerateFlowFile", "genfile");
+  std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true);
+
+  plan->setProperty(invokehttp, InvokeHTTP::Method.getName(), "GET");
+  plan->setProperty(invokehttp, InvokeHTTP::URL.getName(), TestHTTPServer::URL);
+  plan->setProperty(invokehttp, InvokeHTTP::PropPutOutputAttributes.getName(), "http.type");
+  invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelFailure, InvokeHTTP::RelNoRetry, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
+  testController.runSession(plan);
+
+  REQUIRE(LogTestController::getInstance().contains("Adding http response body to flow file attribute http.type"));
 }